diff --git a/loader/loader.go b/loader/loader.go index 662f9f753e782238c1308b23cefdd4aebef12cdc..dc37c90a6675c6e774be4604baa2fa59ed170240 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -145,13 +145,8 @@ func (s *state) parseStream(wg *sync.WaitGroup) { s.SetError(err) return } - if farm.Fingerprint64([]byte(nq.Predicate))%s.numInstances == - s.instanceIdx { - s.cnq <- nq - atomic.AddUint64(&s.ctr.parsed, 1) - } else { - atomic.AddUint64(&s.ctr.ignored, 1) - } + s.cnq <- nq + atomic.AddUint64(&s.ctr.parsed, 1) } } @@ -162,6 +157,12 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { if s.Error() != nil { return } + // Only handle this edge if the attribute satisfies the modulo rule + if farm.Fingerprint64([]byte(nq.Predicate))%s.numInstances != s.instanceIdx { + atomic.AddUint64(&s.ctr.ignored, 1) + continue + } + edge, err := nq.ToEdge() for err != nil { // Just put in a retry loop to tackle temporary errors. @@ -177,11 +178,6 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { edge, err = nq.ToEdge() } - // Only handle this edge if the attribute satisfies the modulo rule - if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances != - s.instanceIdx { - glog.WithField("edge", edge).Fatal("We shouldn't be receiving this edge.") - } key := posting.Key(edge.Entity, edge.Attribute) plist := posting.GetOrCreate(key, dataStore) plist.AddMutation(edge, posting.Set)