From 357a6599c5ece15506fa4d8f6749a2be30bcada6 Mon Sep 17 00:00:00 2001
From: Ashwin <ashwin2007ray@gmail.com>
Date: Sat, 27 Feb 2016 15:52:12 +1100
Subject: [PATCH] Code refactoring, function signature changes in worker.go

---
 query/query_test.go   |  4 +--
 server/main.go        |  8 +++---
 server/main_test.go   |  2 +-
 worker/worker.go      | 64 ++++++++++++++++++++-----------------------
 worker/worker_test.go |  2 +-
 5 files changed, 37 insertions(+), 43 deletions(-)

diff --git a/query/query_test.go b/query/query_test.go
index 95b124ce..7589cdf3 100644
--- a/query/query_test.go
+++ b/query/query_test.go
@@ -103,7 +103,7 @@ func TestNewGraph(t *testing.T) {
 		t.Error(err)
 	}
 
-	worker.Init(ps, nil, nil, 0, 1)
+	worker.Init(ps, nil, 0, 1)
 
 	uo := flatbuffers.GetUOffsetT(sg.result)
 	r := new(task.Result)
@@ -134,7 +134,7 @@ func populateGraph(t *testing.T) (string, *store.Store) {
 	ps := new(store.Store)
 	ps.Init(dir)
 
-	worker.Init(ps, nil, nil, 0, 1)
+	worker.Init(ps, nil, 0, 1)
 
 	clog := commit.NewLogger(dir, "mutations", 50<<20)
 	clog.Init()
diff --git a/server/main.go b/server/main.go
index f5e61bdd..5882f182 100644
--- a/server/main.go
+++ b/server/main.go
@@ -144,23 +144,23 @@ func main() {
 	defer clog.Close()
 
 	addrs := strings.Split(*workers, ",")
-	len := uint64(len(addrs))
+	lenAddr := uint64(len(addrs))
 
 	posting.Init(clog)
 
 	if *instanceIdx != 0 {
-		worker.Init(ps, nil, addrs, *instanceIdx, len)
+		worker.Init(ps, nil, *instanceIdx, lenAddr)
 		uid.Init(nil)
 	} else {
 		uidStore := new(store.Store)
 		uidStore.Init(*uidDir)
 		defer uidStore.Close()
 		// Only server instance 0 will have uidStore
-		worker.Init(ps, uidStore, addrs, *instanceIdx, len)
+		worker.Init(ps, uidStore, *instanceIdx, lenAddr)
 		uid.Init(uidStore)
 	}
 
-	worker.Connect()
+	worker.Connect(addrs)
 
 	http.HandleFunc("/query", queryHandler)
 	glog.WithField("port", *port).Info("Listening for requests...")
diff --git a/server/main_test.go b/server/main_test.go
index 07e1642f..962256cd 100644
--- a/server/main_test.go
+++ b/server/main_test.go
@@ -62,7 +62,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er
 	clog.Init()
 
 	posting.Init(clog)
-	worker.Init(ps, nil, nil, 0, 1)
+	worker.Init(ps, nil, 0, 1)
 	uid.Init(ps)
 	loader.Init(ps, ps)
 
diff --git a/worker/worker.go b/worker/worker.go
index 560b1e32..37363c84 100644
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -22,18 +22,16 @@ var glog = x.Log("worker")
 var dataStore, uidStore *store.Store
 var pools []*conn.Pool
 var numInstances, instanceIdx uint64
-
 var addrs []string
 
-func Init(ps, uStore *store.Store, workerList []string, idx, numInst uint64) {
+func Init(ps, uStore *store.Store, idx, numInst uint64) {
 	dataStore = ps
 	uidStore = uStore
-	addrs = workerList
 	instanceIdx = idx
 	numInstances = numInst
 }
 
-func Connect() {
+func Connect(workerList []string) {
 	w := new(Worker)
 	if err := rpc.Register(w); err != nil {
 		glog.Fatal(err)
@@ -41,7 +39,13 @@ func Connect() {
 	if err := runServer(*workerPort); err != nil {
 		glog.Fatal(err)
 	}
+	if uint64(len(workerList)) != numInstances {
+		glog.WithField("len(list)", len(workerList)).
+			WithField("numInstances", numInstances).
+			Fatalf("Wrong number of instances in workerList")
+	}
 
+	addrs = workerList
 	for _, addr := range addrs {
 		if len(addr) == 0 {
 			continue
@@ -69,40 +73,28 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) {
 	attr := string(q.Attr())
 	idx := farm.Fingerprint64([]byte(attr)) % numInstances
 
+	runHere := false
 	if attr == "_xid_" || attr == "_uid_" {
-		if instanceIdx == 0 {
-			return ProcessTask(qu)
-		} else { // Send the request to instance 0 which has uidstore
-			pool := pools[0]
-			addr := addrs[0]
-			query := new(conn.Query)
-			query.Data = qu
-			reply := new(conn.Reply)
-			if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
-				glog.WithField("call", "Worker.ServeTask").Fatal(err)
-			}
-			glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
-				Info("Got reply from server")
-
-			return reply.Data, nil
-		}
+		idx = 0
+		runHere = (instanceIdx == 0)
 	} else {
-		if idx == instanceIdx {
-			return ProcessTask(qu)
-		} else {
-			pool := pools[idx]
-			addr := addrs[idx]
-			query := new(conn.Query)
-			query.Data = qu
-			reply := new(conn.Reply)
-			if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
-				glog.WithField("call", "Worker.ServeTask").Fatal(err)
-			}
-			glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
-				Info("Got reply from server")
+		runHere = (instanceIdx == idx)
+	}
 
-			return reply.Data, nil
+	if runHere {
+		return ProcessTask(qu)
+	} else { // Send the request to instance 0 which has uidstore
+		pool := pools[idx]
+		addr := addrs[idx]
+		query := new(conn.Query)
+		query.Data = qu
+		reply := new(conn.Reply)
+		if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
+			glog.WithField("call", "Worker.ServeTask").Fatal(err)
 		}
+		glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
+			Info("Got reply from server")
+		return reply.Data, nil
 	}
 }
 
@@ -193,7 +185,9 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) {
 	if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx {
 		reply.Data, rerr = ProcessTask(query.Data)
 	} else {
-		glog.Fatalf("Request sent to wrong server")
+		glog.WithField("attribute", attr).
+			WithField("instanceIdx", instanceIdx).
+			Fatalf("Request sent to wrong server")
 	}
 	return rerr
 }
diff --git a/worker/worker_test.go b/worker/worker_test.go
index 8730617e..809eb05c 100644
--- a/worker/worker_test.go
+++ b/worker/worker_test.go
@@ -58,7 +58,7 @@ func TestProcessTask(t *testing.T) {
 	defer clog.Close()
 
 	posting.Init(clog)
-	Init(ps, nil, nil, 0, 1)
+	Init(ps, nil, 0, 1)
 
 	edge := x.DirectedEdge{
 		ValueId:   23,
-- 
GitLab