diff --git a/worker/worker.go b/worker/worker.go index 37363c845716cba803308278399d39712d20d5eb..5175a9b5ac154b53155ea488afe4c6fb659cded4 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -83,32 +83,31 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { if runHere { return ProcessTask(qu) - } else { // Send the request to instance 0 which has uidstore - pool := pools[idx] - addr := addrs[idx] - query := new(conn.Query) - query.Data = qu - reply := new(conn.Reply) - if err := pool.Call("Worker.ServeTask", query, reply); err != nil { - glog.WithField("call", "Worker.ServeTask").Fatal(err) - } - glog.WithField("reply", string(reply.Data)).WithField("addr", addr). - Info("Got reply from server") - return reply.Data, nil } + + pool := pools[idx] + addr := addrs[idx] + query := new(conn.Query) + query.Data = qu + reply := new(conn.Reply) + if err := pool.Call("Worker.ServeTask", query, reply); err != nil { + glog.WithField("call", "Worker.ServeTask").Fatal(err) + } + glog.WithField("reply", string(reply.Data)).WithField("addr", addr). + Info("Got reply from server") + return reply.Data, nil } func ProcessTask(query []byte) (result []byte, rerr error) { uo := flatbuffers.GetUOffsetT(query) q := new(task.Query) q.Init(query, uo) + attr := string(q.Attr()) b := flatbuffers.NewBuilder(0) voffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) - attr := string(q.Attr()) - for i := 0; i < q.UidsLength(); i++ { uid := q.Uids(i) key := posting.Key(uid, attr) @@ -180,8 +179,8 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { uo := flatbuffers.GetUOffsetT(query.Data) q := new(task.Query) q.Init(query.Data, uo) - attr := string(q.Attr()) + if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { reply.Data, rerr = ProcessTask(query.Data) } else {