From c54bf33a5aca5ac41a807f697f3e45c0c4abc30d Mon Sep 17 00:00:00 2001 From: Manish R Jain <manishrjain@gmail.com> Date: Tue, 8 Mar 2016 07:00:47 +0000 Subject: [PATCH] Fix a bug due to which we were unable to retrieve xids. --- query/query.go | 9 ++++++++- tools/dlist/main.go | 3 +++ worker/task.go | 7 ++++++- worker/worker.go | 5 +++-- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/query/query.go b/query/query.go index 0e23c138..cd627e90 100644 --- a/query/query.go +++ b/query/query.go @@ -403,7 +403,6 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { func ProcessGraph(sg *SubGraph, rch chan error) { var err error if len(sg.query) > 0 && sg.Attr != "_root_" { - // This task execution would go over the wire in later versions. sg.result, err = worker.ProcessTaskOverNetwork(sg.query) if err != nil { x.Err(glog, err).Error("While processing task.") @@ -416,6 +415,14 @@ func ProcessGraph(sg *SubGraph, rch chan error) { r := new(task.Result) r.Init(sg.result, uo) + if r.ValuesLength() > 0 { + var v task.Value + if r.Values(&v, 0) { + glog.WithField("attr", sg.Attr).WithField("val", string(v.ValBytes())). + Info("Sample value") + } + } + sorted, err := sortedUniqueUids(r) if err != nil { x.Err(glog, err).Error("While processing task.") diff --git a/tools/dlist/main.go b/tools/dlist/main.go index abd471ae..a339c7d6 100644 --- a/tools/dlist/main.go +++ b/tools/dlist/main.go @@ -101,5 +101,8 @@ func main() { if err != nil { glog.WithError(err).Fatal("Unable to get key") } + if len(val) == 0 { + glog.Fatal("Unable to find posting list") + } output(val) } diff --git a/worker/task.go b/worker/task.go index f6ce950e..90a26a9b 100644 --- a/worker/task.go +++ b/worker/task.go @@ -51,7 +51,12 @@ func processTask(query []byte) (result []byte, rerr error) { uo := flatbuffers.GetUOffsetT(query) q := new(task.Query) q.Init(query, uo) + attr := string(q.Attr()) + store := dataStore + if attr == "_xid_" { + store = uidStore + } b := flatbuffers.NewBuilder(0) voffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) @@ -60,7 +65,7 @@ func processTask(query []byte) (result []byte, rerr error) { for i := 0; i < q.UidsLength(); i++ { uid := q.Uids(i) key := posting.Key(uid, attr) - pl := posting.GetOrCreate(key, dataStore) + pl := posting.GetOrCreate(key, store) var valoffset flatbuffers.UOffsetT if val, err := pl.Value(); err != nil { diff --git a/worker/worker.go b/worker/worker.go index f572caf0..f6816e20 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -126,9 +126,10 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { q := new(task.Query) q.Init(query.Data, uo) attr := string(q.Attr()) - glog.WithField("attr", attr).WithField("instanceIdx", instanceIdx).Info("ServeTask") + glog.WithField("attr", attr).WithField("num_uids", q.UidsLength()). + WithField("instanceIdx", instanceIdx).Info("ServeTask") - if attr == "_xid_" || + if (instanceIdx == 0 && attr == "_xid_") || farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { reply.Data, rerr = processTask(query.Data) -- GitLab