From 645535ffbfa4e7ce921ac9c011476c342a57ae39 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Wed, 9 Mar 2016 12:52:35 +1100
Subject: [PATCH] Move the fp mod out of parseStream, as it's shared between
 loader and uid assigner.

---
 loader/loader.go | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)

diff --git a/loader/loader.go b/loader/loader.go
index 662f9f75..dc37c90a 100644
--- a/loader/loader.go
+++ b/loader/loader.go
@@ -145,13 +145,8 @@ func (s *state) parseStream(wg *sync.WaitGroup) {
 			s.SetError(err)
 			return
 		}
-		if farm.Fingerprint64([]byte(nq.Predicate))%s.numInstances ==
-			s.instanceIdx {
-			s.cnq <- nq
-			atomic.AddUint64(&s.ctr.parsed, 1)
-		} else {
-			atomic.AddUint64(&s.ctr.ignored, 1)
-		}
+		s.cnq <- nq
+		atomic.AddUint64(&s.ctr.parsed, 1)
 	}
 }
 
@@ -162,6 +157,12 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
 		if s.Error() != nil {
 			return
 		}
+		// Only handle this edge if the attribute satisfies the modulo rule
+		if farm.Fingerprint64([]byte(nq.Predicate))%s.numInstances != s.instanceIdx {
+			atomic.AddUint64(&s.ctr.ignored, 1)
+			continue
+		}
+
 		edge, err := nq.ToEdge()
 		for err != nil {
 			// Just put in a retry loop to tackle temporary errors.
@@ -177,11 +178,6 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
 			edge, err = nq.ToEdge()
 		}
 
-		// Only handle this edge if the attribute satisfies the modulo rule
-		if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances !=
-			s.instanceIdx {
-			glog.WithField("edge", edge).Fatal("We shouldn't be receiving this edge.")
-		}
 		key := posting.Key(edge.Entity, edge.Attribute)
 		plist := posting.GetOrCreate(key, dataStore)
 		plist.AddMutation(edge, posting.Set)
-- 
GitLab