From 8bc03c2c12593a8f4f4c44e4d3dc106d8b537237 Mon Sep 17 00:00:00 2001
From: Ashwin <ashwin2007ray@gmail.com>
Date: Fri, 26 Feb 2016 04:20:46 +1100
Subject: [PATCH] added support for distributed query through network calls

---
 :q                    |   5 ++
 query/query_test.go   |   4 +-
 server/main.go        |   4 +-
 server/main_test.go   |   2 +-
 worker/worker.go      | 103 ++++++++++++++++++++++++++++--------------
 worker/worker_test.go |   2 +-
 6 files changed, 80 insertions(+), 40 deletions(-)
 create mode 100644 :q

diff --git a/:q b/:q
new file mode 100644
index 00000000..ba6f93ab
--- /dev/null
+++ b/:q
@@ -0,0 +1,5 @@
+worker/worker.go|30 col 62| : expected type, found ')'
+worker/worker.go|31 col 12| : expected ';', found '='
+worker/worker.go|39 col 2| : expected declaration, found 'if'
+worker/worker.go|48 col 2| : expected declaration, found 'for'
+worker/worker.go|107 col 4| : expected declaration, found 'if'
diff --git a/query/query_test.go b/query/query_test.go
index 00393e47..fde7afd4 100644
--- a/query/query_test.go
+++ b/query/query_test.go
@@ -103,7 +103,7 @@ func TestNewGraph(t *testing.T) {
 		t.Error(err)
 	}
 
-	worker.Init(ps, nil)
+	worker.Init(ps, nil, 0, 1)
 
 	uo := flatbuffers.GetUOffsetT(sg.result)
 	r := new(task.Result)
@@ -134,7 +134,7 @@ func populateGraph(t *testing.T) (string, *store.Store) {
 	ps := new(store.Store)
 	ps.Init(dir)
 
-	worker.Init(ps, nil)
+	worker.Init(ps, nil, 0, 1)
 
 	clog := commit.NewLogger(dir, "mutations", 50<<20)
 	clog.Init()
diff --git a/server/main.go b/server/main.go
index 72feeaab..499f689c 100644
--- a/server/main.go
+++ b/server/main.go
@@ -147,10 +147,10 @@ func main() {
 		xiduidStore := new(store.Store)
 		xiduidStore.Init(*xiduidDir)
 		defer xiduidStore.Close()
-		worker.Init(ps, xiduidStore) //Only server instance 0 will have xiduidStore
+		worker.Init(ps, xiduidStore, *instanceIdx, *numInstances) //Only server instance 0 will have xiduidStore
 		uid.Init(xiduidStore)
 	} else {
-		worker.Init(ps, nil)
+		worker.Init(ps, nil, *instanceIdx, *numInstances)
 	}
 	worker.Connect()
 
diff --git a/server/main_test.go b/server/main_test.go
index 5e6a9f81..962256cd 100644
--- a/server/main_test.go
+++ b/server/main_test.go
@@ -62,7 +62,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er
 	clog.Init()
 
 	posting.Init(clog)
-	worker.Init(ps, nil)
+	worker.Init(ps, nil, 0, 1)
 	uid.Init(ps)
 	loader.Init(ps, ps)
 
diff --git a/worker/worker.go b/worker/worker.go
index cfa3b954..a6fd374a 100644
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -12,6 +12,7 @@ import (
 	"github.com/dgraph-io/dgraph/store"
 	"github.com/dgraph-io/dgraph/task"
 	"github.com/dgraph-io/dgraph/x"
+	"github.com/dgryski/go-farm"
 	"github.com/google/flatbuffers/go"
 )
 
@@ -23,10 +24,14 @@ var workerPort = flag.String("workerport", ":12345",
 var glog = x.Log("worker")
 var dataStore, xiduidStore *store.Store
 var pools []*conn.Pool
+var addrs = strings.Split(*workers, ",")
+var numInstances, instanceIdx uint64
 
-func Init(ps, xuStore *store.Store) {
+func Init(ps, xuStore *store.Store, idx, numInst uint64) {
 	dataStore = ps
 	xiduidStore = xuStore
+	numInstances = numInst
+	instanceIdx = idx
 }
 
 func Connect() {
@@ -63,6 +68,26 @@ func Connect() {
 	glog.Info("Server started. Clients connected.")
 }
 
+// TODO:The format of worker IP input has to be discussed
+func ProcessTaskOverNetwork(qu []byte, idx uint64) (result []byte, rerr error) {
+	pool := pools[idx]
+	addr := addrs[idx]
+	client, err := pool.Get()
+	if err != nil {
+		glog.Fatal(err)
+	}
+	query := new(conn.Query)
+	query.Data = qu
+	reply := new(conn.Reply)
+	if err = client.Call("Worker.ServeTask", query, reply); err != nil {
+		glog.WithField("call", "Worker.ServeTask").Fatal(err)
+	}
+	glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
+		Info("Got reply from server")
+
+	return reply.Data, nil
+}
+
 func ProcessTask(query []byte) (result []byte, rerr error) {
 	uo := flatbuffers.GetUOffsetT(query)
 	q := new(task.Query)
@@ -73,42 +98,47 @@ func ProcessTask(query []byte) (result []byte, rerr error) {
 	uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
 
 	attr := string(q.Attr())
-	for i := 0; i < q.UidsLength(); i++ {
-		uid := q.Uids(i)
-		key := posting.Key(uid, attr)
-		pl := posting.GetOrCreate(key, dataStore)
-
-		var valoffset flatbuffers.UOffsetT
-		if val, err := pl.Value(); err != nil {
-			valoffset = b.CreateByteVector(x.Nilbyte)
-		} else {
-			valoffset = b.CreateByteVector(val)
-		}
-		task.ValueStart(b)
-		task.ValueAddVal(b, valoffset)
-		voffsets[i] = task.ValueEnd(b)
+	if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx {
+		for i := 0; i < q.UidsLength(); i++ {
+			uid := q.Uids(i)
+			key := posting.Key(uid, attr)
+			pl := posting.GetOrCreate(key, dataStore)
+
+			var valoffset flatbuffers.UOffsetT
+			if val, err := pl.Value(); err != nil {
+				valoffset = b.CreateByteVector(x.Nilbyte)
+			} else {
+				valoffset = b.CreateByteVector(val)
+			}
+			task.ValueStart(b)
+			task.ValueAddVal(b, valoffset)
+			voffsets[i] = task.ValueEnd(b)
 
-		ulist := pl.GetUids()
-		uoffsets[i] = x.UidlistOffset(b, ulist)
-	}
-	task.ResultStartValuesVector(b, len(voffsets))
-	for i := len(voffsets) - 1; i >= 0; i-- {
-		b.PrependUOffsetT(voffsets[i])
-	}
-	valuesVent := b.EndVector(len(voffsets))
+			ulist := pl.GetUids()
+			uoffsets[i] = x.UidlistOffset(b, ulist)
+		}
+		task.ResultStartValuesVector(b, len(voffsets))
+		for i := len(voffsets) - 1; i >= 0; i-- {
+			b.PrependUOffsetT(voffsets[i])
+		}
+		valuesVent := b.EndVector(len(voffsets))
 
-	task.ResultStartUidmatrixVector(b, len(uoffsets))
-	for i := len(uoffsets) - 1; i >= 0; i-- {
-		b.PrependUOffsetT(uoffsets[i])
+		task.ResultStartUidmatrixVector(b, len(uoffsets))
+		for i := len(uoffsets) - 1; i >= 0; i-- {
+			b.PrependUOffsetT(uoffsets[i])
+		}
+		matrixVent := b.EndVector(len(uoffsets))
+
+		task.ResultStart(b)
+		task.ResultAddValues(b, valuesVent)
+		task.ResultAddUidmatrix(b, matrixVent)
+		rend := task.ResultEnd(b)
+		b.Finish(rend)
+		return b.Bytes[b.Head():], nil
+	} else {
+		return ProcessTaskOverNetwork(query,
+			farm.Fingerprint64([]byte(attr))%numInstances)
 	}
-	matrixVent := b.EndVector(len(uoffsets))
-
-	task.ResultStart(b)
-	task.ResultAddValues(b, valuesVent)
-	task.ResultAddUidmatrix(b, matrixVent)
-	rend := task.ResultEnd(b)
-	b.Finish(rend)
-	return b.Bytes[b.Head():], nil
 }
 
 func NewQuery(attr string, uids []uint64) []byte {
@@ -140,6 +170,11 @@ func (w *Worker) Hello(query *conn.Query, reply *conn.Reply) error {
 	return nil
 }
 
+func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) {
+	reply.Data, rerr = ProcessTask(query.Data)
+	return rerr
+}
+
 func serveRequests(irwc io.ReadWriteCloser) {
 	for {
 		sc := &conn.ServerCodec{
diff --git a/worker/worker_test.go b/worker/worker_test.go
index 8d2dbc8e..809eb05c 100644
--- a/worker/worker_test.go
+++ b/worker/worker_test.go
@@ -58,7 +58,7 @@ func TestProcessTask(t *testing.T) {
 	defer clog.Close()
 
 	posting.Init(clog)
-	Init(ps, nil)
+	Init(ps, nil, 0, 1)
 
 	edge := x.DirectedEdge{
 		ValueId:   23,
-- 
GitLab