diff --git a/gql/parser.go b/gql/parser.go index 2ba5654f32f31986dc184d7c39768e6f86cf7958..ae511c2c1726c83ab092faaa757867498c1471a7 100644 --- a/gql/parser.go +++ b/gql/parser.go @@ -23,7 +23,6 @@ import ( "github.com/dgraph-io/dgraph/lex" "github.com/dgraph-io/dgraph/query" - "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/x" ) diff --git a/loader/loader.go b/loader/loader.go index 7af54bb8ef606847b5e4dedd0fff5f993ed004d3..5f3ad2d97fad143fe15d2b60543670c64bd6eef1 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -137,10 +137,16 @@ func (s *state) handleNQuads(wg *sync.WaitGroup, rwStore, rStore *store.Store) { edge, err = nq.ToEdge(s.instanceIdx, s.numInstances, rStore) } - key := posting.Key(edge.Entity, edge.Attribute) - plist := posting.GetOrCreate(key, rwStore) - plist.AddMutation(edge, posting.Set) - atomic.AddUint64(&s.ctr.processed, 1) + // Only handle this edge if the attribute satisfies the modulo rule + if farm.Fingerprint64([]byte(nq.Attribute))%s.numInstances != s.instanceIdx { + key := posting.Key(edge.Entity, edge.Attribute) + plist := posting.GetOrCreate(key, rwStore) + plist.AddMutation(edge, posting.Set) + atomic.AddUint64(&s.ctr.processed, 1) + } else { + atomic.AddUint64(&s.ctr.ignored, 1) + } + } wg.Done() } diff --git a/query/query.go b/query/query.go index aa4fd4c4cf1341ca773b5ed5e4452a3fa868f945..74539534fb0067f2fa6bf0a5835a2b754c025982 100644 --- a/query/query.go +++ b/query/query.go @@ -252,11 +252,11 @@ func (g *SubGraph) ToJson(l *Latency) (js []byte, rerr error) { return json.Marshal(r) } -func NewGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) { +func NewGraph(euid uint64, exid string) (*SubGraph, error) { // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { - u, err := uid.GetOrAssign(exid, 0, 1, pstore) // instanceIdx = 0, numInstances = 1 by default + u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default if err != nil { x.Err(glog, err).WithField("xid", exid).Error( "While GetOrAssign uid from external id") @@ -382,11 +382,11 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { return sorted, nil } -func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { +func ProcessGraph(sg *SubGraph, rch chan error) { var err error if len(sg.query) > 0 && sg.Attr != "_root_" { // This task execution would go over the wire in later versions. - sg.result, err = posting.ProcessTask(sg.query, pstore) + sg.result, err = posting.ProcessTask(sg.query) if err != nil { x.Err(glog, err).Error("While processing task.") rch <- err @@ -425,7 +425,7 @@ func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { for i := 0; i < len(sg.Children); i++ { child := sg.Children[i] child.query = createTaskQuery(child.Attr, sorted) - go ProcessGraph(child, childchan, pstore) + go ProcessGraph(child, childchan) } // Now get all the results back. diff --git a/server/main.go b/server/main.go index 2089271cc17ab9ac6a6c57792010a7edd48303f3..60fee1f0d37b16b48499651227d4dbf70cce51a5 100644 --- a/server/main.go +++ b/server/main.go @@ -125,7 +125,7 @@ func main() { clog.Init() defer clog.Close() - posting.Init(clog) + posting.Init(ps, clog) http.HandleFunc("/query", queryHandler) glog.WithField("port", *port).Info("Listening for requests...")