From 8a0a2a9aee9286c05e1beab855d62e5f1ce6272f Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Fri, 26 Feb 2016 17:48:53 +1100
Subject: [PATCH] Add a Mutate RPC call in the worker. This would be used to
 propagate mutations across the cluster.

---
 server/main.go   |  7 ++++---
 worker/worker.go | 36 +++++++++++++++++++++++++++++++++++-
 2 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/server/main.go b/server/main.go
index 548b530c..fce2df10 100644
--- a/server/main.go
+++ b/server/main.go
@@ -50,6 +50,7 @@ var instanceIdx = flag.Uint64("instanceIdx", 0,
 	"serves only entities whose Fingerprint % numInstance == instanceIdx.")
 var workers = flag.String("workers", "",
 	"Comma separated list of IP addresses of workers")
+var numInstances uint64
 
 func addCorsHeaders(w http.ResponseWriter) {
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -166,17 +167,17 @@ func main() {
 	defer clog.Close()
 
 	addrs := strings.Split(*workers, ",")
-
+	numInstances = len(addrs)
 	posting.Init(clog)
 	if *instanceIdx != 0 {
-		worker.Init(ps, nil, addrs)
+		worker.Init(ps, nil, addrs, *instanceIdx)
 		uid.Init(nil)
 	} else {
 		uidStore := new(store.Store)
 		uidStore.Init(*uidDir)
 		defer uidStore.Close()
 		// Only server instance 0 will have uidStore
-		worker.Init(ps, uidStore, addrs)
+		worker.Init(ps, uidStore, addrs, *instanceIdx)
 		uid.Init(uidStore)
 	}
 
diff --git a/worker/worker.go b/worker/worker.go
index 20ee65a8..324f0edd 100644
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -2,6 +2,7 @@ package worker
 
 import (
 	"flag"
+	"fmt"
 	"io"
 	"net"
 	"net/rpc"
@@ -11,6 +12,7 @@ import (
 	"github.com/dgraph-io/dgraph/store"
 	"github.com/dgraph-io/dgraph/task"
 	"github.com/dgraph-io/dgraph/x"
+	"github.com/dgryski/go-farm"
 	"github.com/google/flatbuffers/go"
 )
 
@@ -21,11 +23,13 @@ var glog = x.Log("worker")
 var dataStore, xiduidStore *store.Store
 var pools []*conn.Pool
 var addrs []string
+var instanceIdx uint64
 
-func Init(ps, xuStore *store.Store, workerList []string) {
+func Init(ps, xuStore *store.Store, workerList []string, idx uint64) {
 	dataStore = ps
 	xiduidStore = xuStore
 	addrs = workerList
+	instanceIdx = idx
 }
 
 func Connect() {
@@ -133,6 +137,36 @@ func (w *Worker) Hello(query *conn.Query, reply *conn.Reply) error {
 	return nil
 }
 
+func (w *Worker) Mutate(query *conn.Query, reply *conn.Reply) (rerr error) {
+	m := new(Mutations)
+	if err := m.Decode(query.Data); err != nil {
+		return err
+	}
+
+	left := new(Mutations)
+	// For now, assume it's all only Set instructions.
+	for _, edge := range m.Set {
+		if farm.Fingerprint64(
+			[]byte(edge.Attribute))%uint64(len(addrs)) != instanceIdx {
+
+			glog.WithField("instanceIdx", instanceIdx).
+				WithField("attr", edge.Attribute).
+				Info("Predicate fingerprint doesn't match instanceIdx")
+			return fmt.Errorf("predicate fingerprint doesn't match this instance.")
+		}
+
+		key := posting.Key(edge.Entity, edge.Attribute)
+		plist := posting.GetOrCreate(key, dataStore)
+		if err := plist.AddMutation(edge, posting.Set); err != nil {
+			left.Set = append(left.Set, edge)
+			glog.WithError(err).WithField("edge", edge).Error("While adding mutation.")
+			continue
+		}
+	}
+	reply.Data, rerr = left.Encode()
+	return
+}
+
 func serveRequests(irwc io.ReadWriteCloser) {
 	for {
 		sc := &conn.ServerCodec{
-- 
GitLab