From 68c01f368e8ec6b630ca721daee9d31c1ba3d7cd Mon Sep 17 00:00:00 2001 From: Ashwin <ashwin2007ray@gmail.com> Date: Fri, 12 Feb 2016 15:53:59 +1100 Subject: [PATCH] Add mod sharding for attributes --- gql/parser.go | 1 - loader/loader.go | 14 ++++++++++---- query/query.go | 10 +++++----- server/main.go | 2 +- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/gql/parser.go b/gql/parser.go index 2ba5654f..ae511c2c 100644 --- a/gql/parser.go +++ b/gql/parser.go @@ -23,7 +23,6 @@ import ( "github.com/dgraph-io/dgraph/lex" "github.com/dgraph-io/dgraph/query" - "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/x" ) diff --git a/loader/loader.go b/loader/loader.go index 7af54bb8..5f3ad2d9 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -137,10 +137,16 @@ func (s *state) handleNQuads(wg *sync.WaitGroup, rwStore, rStore *store.Store) { edge, err = nq.ToEdge(s.instanceIdx, s.numInstances, rStore) } - key := posting.Key(edge.Entity, edge.Attribute) - plist := posting.GetOrCreate(key, rwStore) - plist.AddMutation(edge, posting.Set) - atomic.AddUint64(&s.ctr.processed, 1) + // Only handle this edge if the attribute satisfies the modulo rule + if farm.Fingerprint64([]byte(nq.Attribute))%s.numInstances != s.instanceIdx { + key := posting.Key(edge.Entity, edge.Attribute) + plist := posting.GetOrCreate(key, rwStore) + plist.AddMutation(edge, posting.Set) + atomic.AddUint64(&s.ctr.processed, 1) + } else { + atomic.AddUint64(&s.ctr.ignored, 1) + } + } wg.Done() } diff --git a/query/query.go b/query/query.go index aa4fd4c4..74539534 100644 --- a/query/query.go +++ b/query/query.go @@ -252,11 +252,11 @@ func (g *SubGraph) ToJson(l *Latency) (js []byte, rerr error) { return json.Marshal(r) } -func NewGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) { +func NewGraph(euid uint64, exid string) (*SubGraph, error) { // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { - u, err := uid.GetOrAssign(exid, 0, 1, pstore) // instanceIdx = 0, numInstances = 1 by default + u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default if err != nil { x.Err(glog, err).WithField("xid", exid).Error( "While GetOrAssign uid from external id") @@ -382,11 +382,11 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { return sorted, nil } -func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { +func ProcessGraph(sg *SubGraph, rch chan error) { var err error if len(sg.query) > 0 && sg.Attr != "_root_" { // This task execution would go over the wire in later versions. - sg.result, err = posting.ProcessTask(sg.query, pstore) + sg.result, err = posting.ProcessTask(sg.query) if err != nil { x.Err(glog, err).Error("While processing task.") rch <- err @@ -425,7 +425,7 @@ func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { for i := 0; i < len(sg.Children); i++ { child := sg.Children[i] child.query = createTaskQuery(child.Attr, sorted) - go ProcessGraph(child, childchan, pstore) + go ProcessGraph(child, childchan) } // Now get all the results back. diff --git a/server/main.go b/server/main.go index 2089271c..60fee1f0 100644 --- a/server/main.go +++ b/server/main.go @@ -125,7 +125,7 @@ func main() { clog.Init() defer clog.Close() - posting.Init(clog) + posting.Init(ps, clog) http.HandleFunc("/query", queryHandler) glog.WithField("port", *port).Info("Listening for requests...") -- GitLab