Skip to content
Snippets Groups Projects
Commit 645535ff authored by Manish R Jain's avatar Manish R Jain
Browse files

Move the fp mod out of parseStream, as it's shared between loader and uid assigner.

parent bb8c4080
No related branches found
No related tags found
No related merge requests found
......@@ -145,13 +145,8 @@ func (s *state) parseStream(wg *sync.WaitGroup) {
s.SetError(err)
return
}
if farm.Fingerprint64([]byte(nq.Predicate))%s.numInstances ==
s.instanceIdx {
s.cnq <- nq
atomic.AddUint64(&s.ctr.parsed, 1)
} else {
atomic.AddUint64(&s.ctr.ignored, 1)
}
s.cnq <- nq
atomic.AddUint64(&s.ctr.parsed, 1)
}
}
......@@ -162,6 +157,12 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
if s.Error() != nil {
return
}
// Only handle this edge if the attribute satisfies the modulo rule
if farm.Fingerprint64([]byte(nq.Predicate))%s.numInstances != s.instanceIdx {
atomic.AddUint64(&s.ctr.ignored, 1)
continue
}
edge, err := nq.ToEdge()
for err != nil {
// Just put in a retry loop to tackle temporary errors.
......@@ -177,11 +178,6 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
edge, err = nq.ToEdge()
}
// Only handle this edge if the attribute satisfies the modulo rule
if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances !=
s.instanceIdx {
glog.WithField("edge", edge).Fatal("We shouldn't be receiving this edge.")
}
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, dataStore)
plist.AddMutation(edge, posting.Set)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment