Skip to content
Snippets Groups Projects
Commit 68c01f36 authored by Ashwin's avatar Ashwin
Browse files

Add mod sharding for attributes

parent 90140682
No related branches found
No related tags found
No related merge requests found
......@@ -23,7 +23,6 @@ import (
"github.com/dgraph-io/dgraph/lex"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
)
......
......@@ -137,10 +137,16 @@ func (s *state) handleNQuads(wg *sync.WaitGroup, rwStore, rStore *store.Store) {
edge, err = nq.ToEdge(s.instanceIdx, s.numInstances, rStore)
}
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, rwStore)
plist.AddMutation(edge, posting.Set)
atomic.AddUint64(&s.ctr.processed, 1)
// Only handle this edge if the attribute satisfies the modulo rule
if farm.Fingerprint64([]byte(nq.Attribute))%s.numInstances != s.instanceIdx {
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, rwStore)
plist.AddMutation(edge, posting.Set)
atomic.AddUint64(&s.ctr.processed, 1)
} else {
atomic.AddUint64(&s.ctr.ignored, 1)
}
}
wg.Done()
}
......
......@@ -252,11 +252,11 @@ func (g *SubGraph) ToJson(l *Latency) (js []byte, rerr error) {
return json.Marshal(r)
}
func NewGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) {
func NewGraph(euid uint64, exid string) (*SubGraph, error) {
// This would set the Result field in SubGraph,
// and populate the children for attributes.
if len(exid) > 0 {
u, err := uid.GetOrAssign(exid, 0, 1, pstore) // instanceIdx = 0, numInstances = 1 by default
u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default
if err != nil {
x.Err(glog, err).WithField("xid", exid).Error(
"While GetOrAssign uid from external id")
......@@ -382,11 +382,11 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) {
return sorted, nil
}
func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) {
func ProcessGraph(sg *SubGraph, rch chan error) {
var err error
if len(sg.query) > 0 && sg.Attr != "_root_" {
// This task execution would go over the wire in later versions.
sg.result, err = posting.ProcessTask(sg.query, pstore)
sg.result, err = posting.ProcessTask(sg.query)
if err != nil {
x.Err(glog, err).Error("While processing task.")
rch <- err
......@@ -425,7 +425,7 @@ func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) {
for i := 0; i < len(sg.Children); i++ {
child := sg.Children[i]
child.query = createTaskQuery(child.Attr, sorted)
go ProcessGraph(child, childchan, pstore)
go ProcessGraph(child, childchan)
}
// Now get all the results back.
......
......@@ -125,7 +125,7 @@ func main() {
clog.Init()
defer clog.Close()
posting.Init(clog)
posting.Init(ps, clog)
http.HandleFunc("/query", queryHandler)
glog.WithField("port", *port).Info("Listening for requests...")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment