Skip to content
Snippets Groups Projects
Commit 8a0a2a9a authored by Manish R Jain's avatar Manish R Jain
Browse files

Add a Mutate RPC call in the worker. This would be used to propagate mutations across the cluster.

parent 799450d6
No related branches found
No related tags found
No related merge requests found
...@@ -50,6 +50,7 @@ var instanceIdx = flag.Uint64("instanceIdx", 0, ...@@ -50,6 +50,7 @@ var instanceIdx = flag.Uint64("instanceIdx", 0,
"serves only entities whose Fingerprint % numInstance == instanceIdx.") "serves only entities whose Fingerprint % numInstance == instanceIdx.")
var workers = flag.String("workers", "", var workers = flag.String("workers", "",
"Comma separated list of IP addresses of workers") "Comma separated list of IP addresses of workers")
var numInstances uint64
func addCorsHeaders(w http.ResponseWriter) { func addCorsHeaders(w http.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
...@@ -166,17 +167,17 @@ func main() { ...@@ -166,17 +167,17 @@ func main() {
defer clog.Close() defer clog.Close()
addrs := strings.Split(*workers, ",") addrs := strings.Split(*workers, ",")
numInstances = len(addrs)
posting.Init(clog) posting.Init(clog)
if *instanceIdx != 0 { if *instanceIdx != 0 {
worker.Init(ps, nil, addrs) worker.Init(ps, nil, addrs, *instanceIdx)
uid.Init(nil) uid.Init(nil)
} else { } else {
uidStore := new(store.Store) uidStore := new(store.Store)
uidStore.Init(*uidDir) uidStore.Init(*uidDir)
defer uidStore.Close() defer uidStore.Close()
// Only server instance 0 will have uidStore // Only server instance 0 will have uidStore
worker.Init(ps, uidStore, addrs) worker.Init(ps, uidStore, addrs, *instanceIdx)
uid.Init(uidStore) uid.Init(uidStore)
} }
......
...@@ -2,6 +2,7 @@ package worker ...@@ -2,6 +2,7 @@ package worker
import ( import (
"flag" "flag"
"fmt"
"io" "io"
"net" "net"
"net/rpc" "net/rpc"
...@@ -11,6 +12,7 @@ import ( ...@@ -11,6 +12,7 @@ import (
"github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-farm"
"github.com/google/flatbuffers/go" "github.com/google/flatbuffers/go"
) )
...@@ -21,11 +23,13 @@ var glog = x.Log("worker") ...@@ -21,11 +23,13 @@ var glog = x.Log("worker")
var dataStore, xiduidStore *store.Store var dataStore, xiduidStore *store.Store
var pools []*conn.Pool var pools []*conn.Pool
var addrs []string var addrs []string
var instanceIdx uint64
func Init(ps, xuStore *store.Store, workerList []string) { func Init(ps, xuStore *store.Store, workerList []string, idx uint64) {
dataStore = ps dataStore = ps
xiduidStore = xuStore xiduidStore = xuStore
addrs = workerList addrs = workerList
instanceIdx = idx
} }
func Connect() { func Connect() {
...@@ -133,6 +137,36 @@ func (w *Worker) Hello(query *conn.Query, reply *conn.Reply) error { ...@@ -133,6 +137,36 @@ func (w *Worker) Hello(query *conn.Query, reply *conn.Reply) error {
return nil return nil
} }
func (w *Worker) Mutate(query *conn.Query, reply *conn.Reply) (rerr error) {
m := new(Mutations)
if err := m.Decode(query.Data); err != nil {
return err
}
left := new(Mutations)
// For now, assume it's all only Set instructions.
for _, edge := range m.Set {
if farm.Fingerprint64(
[]byte(edge.Attribute))%uint64(len(addrs)) != instanceIdx {
glog.WithField("instanceIdx", instanceIdx).
WithField("attr", edge.Attribute).
Info("Predicate fingerprint doesn't match instanceIdx")
return fmt.Errorf("predicate fingerprint doesn't match this instance.")
}
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, dataStore)
if err := plist.AddMutation(edge, posting.Set); err != nil {
left.Set = append(left.Set, edge)
glog.WithError(err).WithField("edge", edge).Error("While adding mutation.")
continue
}
}
reply.Data, rerr = left.Encode()
return
}
func serveRequests(irwc io.ReadWriteCloser) { func serveRequests(irwc io.ReadWriteCloser) {
for { for {
sc := &conn.ServerCodec{ sc := &conn.ServerCodec{
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment