Skip to content
Snippets Groups Projects
Commit 357a6599 authored by Ashwin's avatar Ashwin
Browse files

Code refactoring, function signature changes in worker.go

parent b9dc381b
No related branches found
No related tags found
No related merge requests found
...@@ -103,7 +103,7 @@ func TestNewGraph(t *testing.T) { ...@@ -103,7 +103,7 @@ func TestNewGraph(t *testing.T) {
t.Error(err) t.Error(err)
} }
worker.Init(ps, nil, nil, 0, 1) worker.Init(ps, nil, 0, 1)
uo := flatbuffers.GetUOffsetT(sg.result) uo := flatbuffers.GetUOffsetT(sg.result)
r := new(task.Result) r := new(task.Result)
...@@ -134,7 +134,7 @@ func populateGraph(t *testing.T) (string, *store.Store) { ...@@ -134,7 +134,7 @@ func populateGraph(t *testing.T) (string, *store.Store) {
ps := new(store.Store) ps := new(store.Store)
ps.Init(dir) ps.Init(dir)
worker.Init(ps, nil, nil, 0, 1) worker.Init(ps, nil, 0, 1)
clog := commit.NewLogger(dir, "mutations", 50<<20) clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init() clog.Init()
......
...@@ -144,23 +144,23 @@ func main() { ...@@ -144,23 +144,23 @@ func main() {
defer clog.Close() defer clog.Close()
addrs := strings.Split(*workers, ",") addrs := strings.Split(*workers, ",")
len := uint64(len(addrs)) lenAddr := uint64(len(addrs))
posting.Init(clog) posting.Init(clog)
if *instanceIdx != 0 { if *instanceIdx != 0 {
worker.Init(ps, nil, addrs, *instanceIdx, len) worker.Init(ps, nil, *instanceIdx, lenAddr)
uid.Init(nil) uid.Init(nil)
} else { } else {
uidStore := new(store.Store) uidStore := new(store.Store)
uidStore.Init(*uidDir) uidStore.Init(*uidDir)
defer uidStore.Close() defer uidStore.Close()
// Only server instance 0 will have uidStore // Only server instance 0 will have uidStore
worker.Init(ps, uidStore, addrs, *instanceIdx, len) worker.Init(ps, uidStore, *instanceIdx, lenAddr)
uid.Init(uidStore) uid.Init(uidStore)
} }
worker.Connect() worker.Connect(addrs)
http.HandleFunc("/query", queryHandler) http.HandleFunc("/query", queryHandler)
glog.WithField("port", *port).Info("Listening for requests...") glog.WithField("port", *port).Info("Listening for requests...")
......
...@@ -62,7 +62,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er ...@@ -62,7 +62,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er
clog.Init() clog.Init()
posting.Init(clog) posting.Init(clog)
worker.Init(ps, nil, nil, 0, 1) worker.Init(ps, nil, 0, 1)
uid.Init(ps) uid.Init(ps)
loader.Init(ps, ps) loader.Init(ps, ps)
......
...@@ -22,18 +22,16 @@ var glog = x.Log("worker") ...@@ -22,18 +22,16 @@ var glog = x.Log("worker")
var dataStore, uidStore *store.Store var dataStore, uidStore *store.Store
var pools []*conn.Pool var pools []*conn.Pool
var numInstances, instanceIdx uint64 var numInstances, instanceIdx uint64
var addrs []string var addrs []string
func Init(ps, uStore *store.Store, workerList []string, idx, numInst uint64) { func Init(ps, uStore *store.Store, idx, numInst uint64) {
dataStore = ps dataStore = ps
uidStore = uStore uidStore = uStore
addrs = workerList
instanceIdx = idx instanceIdx = idx
numInstances = numInst numInstances = numInst
} }
func Connect() { func Connect(workerList []string) {
w := new(Worker) w := new(Worker)
if err := rpc.Register(w); err != nil { if err := rpc.Register(w); err != nil {
glog.Fatal(err) glog.Fatal(err)
...@@ -41,7 +39,13 @@ func Connect() { ...@@ -41,7 +39,13 @@ func Connect() {
if err := runServer(*workerPort); err != nil { if err := runServer(*workerPort); err != nil {
glog.Fatal(err) glog.Fatal(err)
} }
if uint64(len(workerList)) != numInstances {
glog.WithField("len(list)", len(workerList)).
WithField("numInstances", numInstances).
Fatalf("Wrong number of instances in workerList")
}
addrs = workerList
for _, addr := range addrs { for _, addr := range addrs {
if len(addr) == 0 { if len(addr) == 0 {
continue continue
...@@ -69,40 +73,28 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { ...@@ -69,40 +73,28 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) {
attr := string(q.Attr()) attr := string(q.Attr())
idx := farm.Fingerprint64([]byte(attr)) % numInstances idx := farm.Fingerprint64([]byte(attr)) % numInstances
runHere := false
if attr == "_xid_" || attr == "_uid_" { if attr == "_xid_" || attr == "_uid_" {
if instanceIdx == 0 { idx = 0
return ProcessTask(qu) runHere = (instanceIdx == 0)
} else { // Send the request to instance 0 which has uidstore
pool := pools[0]
addr := addrs[0]
query := new(conn.Query)
query.Data = qu
reply := new(conn.Reply)
if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
glog.WithField("call", "Worker.ServeTask").Fatal(err)
}
glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
Info("Got reply from server")
return reply.Data, nil
}
} else { } else {
if idx == instanceIdx { runHere = (instanceIdx == idx)
return ProcessTask(qu) }
} else {
pool := pools[idx]
addr := addrs[idx]
query := new(conn.Query)
query.Data = qu
reply := new(conn.Reply)
if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
glog.WithField("call", "Worker.ServeTask").Fatal(err)
}
glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
Info("Got reply from server")
return reply.Data, nil if runHere {
return ProcessTask(qu)
} else { // Send the request to instance 0 which has uidstore
pool := pools[idx]
addr := addrs[idx]
query := new(conn.Query)
query.Data = qu
reply := new(conn.Reply)
if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
glog.WithField("call", "Worker.ServeTask").Fatal(err)
} }
glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
Info("Got reply from server")
return reply.Data, nil
} }
} }
...@@ -193,7 +185,9 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { ...@@ -193,7 +185,9 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) {
if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx {
reply.Data, rerr = ProcessTask(query.Data) reply.Data, rerr = ProcessTask(query.Data)
} else { } else {
glog.Fatalf("Request sent to wrong server") glog.WithField("attribute", attr).
WithField("instanceIdx", instanceIdx).
Fatalf("Request sent to wrong server")
} }
return rerr return rerr
} }
......
...@@ -58,7 +58,7 @@ func TestProcessTask(t *testing.T) { ...@@ -58,7 +58,7 @@ func TestProcessTask(t *testing.T) {
defer clog.Close() defer clog.Close()
posting.Init(clog) posting.Init(clog)
Init(ps, nil, nil, 0, 1) Init(ps, nil, 0, 1)
edge := x.DirectedEdge{ edge := x.DirectedEdge{
ValueId: 23, ValueId: 23,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment