Skip to content
Snippets Groups Projects
Commit c54bf33a authored by Manish R Jain's avatar Manish R Jain
Browse files

Fix a bug due to which we were unable to retrieve xids.

parent 62b6b8fd
No related branches found
No related tags found
No related merge requests found
...@@ -403,7 +403,6 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { ...@@ -403,7 +403,6 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) {
func ProcessGraph(sg *SubGraph, rch chan error) { func ProcessGraph(sg *SubGraph, rch chan error) {
var err error var err error
if len(sg.query) > 0 && sg.Attr != "_root_" { if len(sg.query) > 0 && sg.Attr != "_root_" {
// This task execution would go over the wire in later versions.
sg.result, err = worker.ProcessTaskOverNetwork(sg.query) sg.result, err = worker.ProcessTaskOverNetwork(sg.query)
if err != nil { if err != nil {
x.Err(glog, err).Error("While processing task.") x.Err(glog, err).Error("While processing task.")
...@@ -416,6 +415,14 @@ func ProcessGraph(sg *SubGraph, rch chan error) { ...@@ -416,6 +415,14 @@ func ProcessGraph(sg *SubGraph, rch chan error) {
r := new(task.Result) r := new(task.Result)
r.Init(sg.result, uo) r.Init(sg.result, uo)
if r.ValuesLength() > 0 {
var v task.Value
if r.Values(&v, 0) {
glog.WithField("attr", sg.Attr).WithField("val", string(v.ValBytes())).
Info("Sample value")
}
}
sorted, err := sortedUniqueUids(r) sorted, err := sortedUniqueUids(r)
if err != nil { if err != nil {
x.Err(glog, err).Error("While processing task.") x.Err(glog, err).Error("While processing task.")
......
...@@ -101,5 +101,8 @@ func main() { ...@@ -101,5 +101,8 @@ func main() {
if err != nil { if err != nil {
glog.WithError(err).Fatal("Unable to get key") glog.WithError(err).Fatal("Unable to get key")
} }
if len(val) == 0 {
glog.Fatal("Unable to find posting list")
}
output(val) output(val)
} }
...@@ -51,7 +51,12 @@ func processTask(query []byte) (result []byte, rerr error) { ...@@ -51,7 +51,12 @@ func processTask(query []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(query) uo := flatbuffers.GetUOffsetT(query)
q := new(task.Query) q := new(task.Query)
q.Init(query, uo) q.Init(query, uo)
attr := string(q.Attr()) attr := string(q.Attr())
store := dataStore
if attr == "_xid_" {
store = uidStore
}
b := flatbuffers.NewBuilder(0) b := flatbuffers.NewBuilder(0)
voffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) voffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
...@@ -60,7 +65,7 @@ func processTask(query []byte) (result []byte, rerr error) { ...@@ -60,7 +65,7 @@ func processTask(query []byte) (result []byte, rerr error) {
for i := 0; i < q.UidsLength(); i++ { for i := 0; i < q.UidsLength(); i++ {
uid := q.Uids(i) uid := q.Uids(i)
key := posting.Key(uid, attr) key := posting.Key(uid, attr)
pl := posting.GetOrCreate(key, dataStore) pl := posting.GetOrCreate(key, store)
var valoffset flatbuffers.UOffsetT var valoffset flatbuffers.UOffsetT
if val, err := pl.Value(); err != nil { if val, err := pl.Value(); err != nil {
......
...@@ -126,9 +126,10 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { ...@@ -126,9 +126,10 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) {
q := new(task.Query) q := new(task.Query)
q.Init(query.Data, uo) q.Init(query.Data, uo)
attr := string(q.Attr()) attr := string(q.Attr())
glog.WithField("attr", attr).WithField("instanceIdx", instanceIdx).Info("ServeTask") glog.WithField("attr", attr).WithField("num_uids", q.UidsLength()).
WithField("instanceIdx", instanceIdx).Info("ServeTask")
if attr == "_xid_" || if (instanceIdx == 0 && attr == "_xid_") ||
farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx {
reply.Data, rerr = processTask(query.Data) reply.Data, rerr = processTask(query.Data)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment