diff --git a/query/query.go b/query/query.go index 0e23c13804cde7ac067e18d3ea4841d36cc55902..cd627e90e9d15d380c344632f4be65f8733e1a5a 100644 --- a/query/query.go +++ b/query/query.go @@ -403,7 +403,6 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { func ProcessGraph(sg *SubGraph, rch chan error) { var err error if len(sg.query) > 0 && sg.Attr != "_root_" { - // This task execution would go over the wire in later versions. sg.result, err = worker.ProcessTaskOverNetwork(sg.query) if err != nil { x.Err(glog, err).Error("While processing task.") @@ -416,6 +415,14 @@ func ProcessGraph(sg *SubGraph, rch chan error) { r := new(task.Result) r.Init(sg.result, uo) + if r.ValuesLength() > 0 { + var v task.Value + if r.Values(&v, 0) { + glog.WithField("attr", sg.Attr).WithField("val", string(v.ValBytes())). + Info("Sample value") + } + } + sorted, err := sortedUniqueUids(r) if err != nil { x.Err(glog, err).Error("While processing task.") diff --git a/tools/dlist/main.go b/tools/dlist/main.go index abd471aebd057a56eab1b2235c13bd7ed507c606..a339c7d6ebc2a4b860957f8a26a8167d7c518bb1 100644 --- a/tools/dlist/main.go +++ b/tools/dlist/main.go @@ -101,5 +101,8 @@ func main() { if err != nil { glog.WithError(err).Fatal("Unable to get key") } + if len(val) == 0 { + glog.Fatal("Unable to find posting list") + } output(val) } diff --git a/worker/task.go b/worker/task.go index f6ce950e141cd9b5e86b254a9070ab2c2f09c6b4..90a26a9bca692b3a4f666e98578af76e606f7fde 100644 --- a/worker/task.go +++ b/worker/task.go @@ -51,7 +51,12 @@ func processTask(query []byte) (result []byte, rerr error) { uo := flatbuffers.GetUOffsetT(query) q := new(task.Query) q.Init(query, uo) + attr := string(q.Attr()) + store := dataStore + if attr == "_xid_" { + store = uidStore + } b := flatbuffers.NewBuilder(0) voffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) @@ -60,7 +65,7 @@ func processTask(query []byte) (result []byte, rerr error) { for i := 0; i < q.UidsLength(); i++ { uid := q.Uids(i) key := posting.Key(uid, attr) - pl := posting.GetOrCreate(key, dataStore) + pl := posting.GetOrCreate(key, store) var valoffset flatbuffers.UOffsetT if val, err := pl.Value(); err != nil { diff --git a/worker/worker.go b/worker/worker.go index f572caf0092c54f84722ba40880956a45f98f4b2..f6816e202398fd99aee47401c70a27ec2f59e3bf 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -126,9 +126,10 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { q := new(task.Query) q.Init(query.Data, uo) attr := string(q.Attr()) - glog.WithField("attr", attr).WithField("instanceIdx", instanceIdx).Info("ServeTask") + glog.WithField("attr", attr).WithField("num_uids", q.UidsLength()). + WithField("instanceIdx", instanceIdx).Info("ServeTask") - if attr == "_xid_" || + if (instanceIdx == 0 && attr == "_xid_") || farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { reply.Data, rerr = processTask(query.Data)