diff --git a/server/main.go b/server/main.go index c89df478b3c33046fea7c3aea68bd7dce85becd3..f5e61bdde762e95af250aa8d5d1ad6f48464dc39 100644 --- a/server/main.go +++ b/server/main.go @@ -144,18 +144,19 @@ func main() { defer clog.Close() addrs := strings.Split(*workers, ",") + len := uint64(len(addrs)) posting.Init(clog) if *instanceIdx != 0 { - worker.Init(ps, nil, addrs, *instanceIdx, len(addrs)) + worker.Init(ps, nil, addrs, *instanceIdx, len) uid.Init(nil) } else { uidStore := new(store.Store) uidStore.Init(*uidDir) defer uidStore.Close() // Only server instance 0 will have uidStore - worker.Init(ps, uidStore, addrs, *instanceIdx, len(addrs)) + worker.Init(ps, uidStore, addrs, *instanceIdx, len) uid.Init(uidStore) } diff --git a/worker/worker.go b/worker/worker.go index 200fda61a6a3a945fd518aa7d5265fac12a1a6ed..86992cb18234e1fabeb05562f7a455d4a672fd3c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -27,7 +27,7 @@ var addrs []string func Init(ps, uStore *store.Store, workerList []string, idx, numInst uint64) { dataStore = ps - uidStore = xuStore + uidStore = uStore addrs = workerList instanceIdx = idx numInstances = numInst @@ -61,24 +61,29 @@ func Connect() { glog.Info("Server started. Clients connected.") } -// TODO:The format of worker IP input has to be discussed -func ProcessTaskOverNetwork(qu []byte, idx uint64) (result []byte, rerr error) { - pool := pools[idx] - addr := addrs[idx] - client, err := pool.Get() - if err != nil { - glog.Fatal(err) - } - query := new(conn.Query) - query.Data = qu - reply := new(conn.Reply) - if err = client.Call("Worker.ServeTask", query, reply); err != nil { - glog.WithField("call", "Worker.ServeTask").Fatal(err) - } - glog.WithField("reply", string(reply.Data)).WithField("addr", addr). - Info("Got reply from server") +func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { + uo := flatbuffers.GetUOffsetT(qu) + q := new(task.Query) + q.Init(qu, uo) - return reply.Data, nil + attr := string(q.Attr()) + idx := farm.Fingerprint64([]byte(attr)) % numInstances + if idx == instanceIdx { + return ProcessTask(qu) + } else { + pool := pools[idx] + addr := addrs[idx] + query := new(conn.Query) + query.Data = qu + reply := new(conn.Reply) + if err := pool.Call("Worker.ServeTask", query, reply); err != nil { + glog.WithField("call", "Worker.ServeTask").Fatal(err) + } + glog.WithField("reply", string(reply.Data)).WithField("addr", addr). + Info("Got reply from server") + + return reply.Data, nil + } } func ProcessTask(query []byte) (result []byte, rerr error) { @@ -91,47 +96,43 @@ func ProcessTask(query []byte) (result []byte, rerr error) { uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) attr := string(q.Attr()) - if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { - for i := 0; i < q.UidsLength(); i++ { - uid := q.Uids(i) - key := posting.Key(uid, attr) - pl := posting.GetOrCreate(key, dataStore) - - var valoffset flatbuffers.UOffsetT - if val, err := pl.Value(); err != nil { - valoffset = b.CreateByteVector(x.Nilbyte) - } else { - valoffset = b.CreateByteVector(val) - } - task.ValueStart(b) - task.ValueAddVal(b, valoffset) - voffsets[i] = task.ValueEnd(b) - ulist := pl.GetUids() - uoffsets[i] = x.UidlistOffset(b, ulist) - } - task.ResultStartValuesVector(b, len(voffsets)) - for i := len(voffsets) - 1; i >= 0; i-- { - b.PrependUOffsetT(voffsets[i]) - } - valuesVent := b.EndVector(len(voffsets)) + for i := 0; i < q.UidsLength(); i++ { + uid := q.Uids(i) + key := posting.Key(uid, attr) + pl := posting.GetOrCreate(key, dataStore) - task.ResultStartUidmatrixVector(b, len(uoffsets)) - for i := len(uoffsets) - 1; i >= 0; i-- { - b.PrependUOffsetT(uoffsets[i]) + var valoffset flatbuffers.UOffsetT + if val, err := pl.Value(); err != nil { + valoffset = b.CreateByteVector(x.Nilbyte) + } else { + valoffset = b.CreateByteVector(val) } - matrixVent := b.EndVector(len(uoffsets)) - - task.ResultStart(b) - task.ResultAddValues(b, valuesVent) - task.ResultAddUidmatrix(b, matrixVent) - rend := task.ResultEnd(b) - b.Finish(rend) - return b.Bytes[b.Head():], nil - } else { - return ProcessTaskOverNetwork(query, - farm.Fingerprint64([]byte(attr))%numInstances) + task.ValueStart(b) + task.ValueAddVal(b, valoffset) + voffsets[i] = task.ValueEnd(b) + + ulist := pl.GetUids() + uoffsets[i] = x.UidlistOffset(b, ulist) + } + task.ResultStartValuesVector(b, len(voffsets)) + for i := len(voffsets) - 1; i >= 0; i-- { + b.PrependUOffsetT(voffsets[i]) + } + valuesVent := b.EndVector(len(voffsets)) + + task.ResultStartUidmatrixVector(b, len(uoffsets)) + for i := len(uoffsets) - 1; i >= 0; i-- { + b.PrependUOffsetT(uoffsets[i]) } + matrixVent := b.EndVector(len(uoffsets)) + + task.ResultStart(b) + task.ResultAddValues(b, valuesVent) + task.ResultAddUidmatrix(b, matrixVent) + rend := task.ResultEnd(b) + b.Finish(rend) + return b.Bytes[b.Head():], nil } func NewQuery(attr string, uids []uint64) []byte { @@ -164,7 +165,16 @@ func (w *Worker) Hello(query *conn.Query, reply *conn.Reply) error { } func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { - reply.Data, rerr = ProcessTask(query.Data) + uo := flatbuffers.GetUOffsetT(query.Data) + q := new(task.Query) + q.Init(query.Data, uo) + + attr := string(q.Attr()) + if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { + reply.Data, rerr = ProcessTask(query.Data) + } else { + glog.Fatalf("Request sent to wrong server") + } return rerr }