Skip to content
Snippets Groups Projects
Commit 2fde66ee authored by Manish R Jain's avatar Manish R Jain
Browse files

Merge pull request #40 from dgraph-io/distributedServer

Distributed server
parents 0deabbb5 a7ddfa56
No related branches found
No related tags found
No related merge requests found
......@@ -13,12 +13,12 @@ var glog = x.Log("conn")
type Pool struct {
clients chan *rpc.Client
addr string
Addr string
}
func NewPool(addr string, maxCap int) *Pool {
p := new(Pool)
p.addr = addr
p.Addr = addr
p.clients = make(chan *rpc.Client, maxCap)
client, err := p.dialNew()
if err != nil {
......@@ -36,7 +36,7 @@ func (p *Pool) dialNew() (*rpc.Client, error) {
var nconn net.Conn
var err error
for i := 0; i < 10; i++ {
nconn, err = d.Dial("tcp", p.addr)
nconn, err = d.Dial("tcp", p.Addr)
if err == nil {
break
}
......@@ -44,7 +44,7 @@ func (p *Pool) dialNew() (*rpc.Client, error) {
break
}
glog.WithField("error", err).WithField("addr", p.addr).
glog.WithField("error", err).WithField("addr", p.Addr).
Info("Retrying connection...")
time.Sleep(10 * time.Second)
}
......
......@@ -406,7 +406,7 @@ func ProcessGraph(sg *SubGraph, rch chan error) {
var err error
if len(sg.query) > 0 && sg.Attr != "_root_" {
// This task execution would go over the wire in later versions.
sg.result, err = worker.ProcessTask(sg.query)
sg.result, err = worker.ProcessTaskOverNetwork(sg.query)
if err != nil {
x.Err(glog, err).Error("While processing task.")
rch <- err
......
......@@ -103,7 +103,7 @@ func TestNewGraph(t *testing.T) {
t.Error(err)
}
worker.Init(ps, nil, nil)
worker.Init(ps, nil, 0, 1)
uo := flatbuffers.GetUOffsetT(sg.result)
r := new(task.Result)
......@@ -134,7 +134,7 @@ func populateGraph(t *testing.T) (string, *store.Store) {
ps := new(store.Store)
ps.Init(dir)
worker.Init(ps, nil, nil)
worker.Init(ps, nil, 0, 1)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
......
......@@ -144,21 +144,23 @@ func main() {
defer clog.Close()
addrs := strings.Split(*workers, ",")
lenAddr := uint64(len(addrs))
posting.Init(clog)
if *instanceIdx != 0 {
worker.Init(ps, nil, addrs)
worker.Init(ps, nil, *instanceIdx, lenAddr)
uid.Init(nil)
} else {
uidStore := new(store.Store)
uidStore.Init(*uidDir)
defer uidStore.Close()
// Only server instance 0 will have uidStore
worker.Init(ps, uidStore, addrs)
worker.Init(ps, uidStore, *instanceIdx, lenAddr)
uid.Init(uidStore)
}
worker.Connect()
worker.Connect(addrs)
http.HandleFunc("/query", queryHandler)
glog.WithField("port", *port).Info("Listening for requests...")
......
......@@ -62,7 +62,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er
clog.Init()
posting.Init(clog)
worker.Init(ps, nil, nil)
worker.Init(ps, nil, 0, 1)
uid.Init(ps)
loader.Init(ps, ps)
......
......@@ -11,6 +11,7 @@ import (
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-farm"
"github.com/google/flatbuffers/go"
)
......@@ -18,17 +19,18 @@ var workerPort = flag.String("workerport", ":12345",
"Port used by worker for internal communication.")
var glog = x.Log("worker")
var dataStore, xiduidStore *store.Store
var dataStore, uidStore *store.Store
var pools []*conn.Pool
var addrs []string
var numInstances, instanceIdx uint64
func Init(ps, xuStore *store.Store, workerList []string) {
func Init(ps, uStore *store.Store, idx, numInst uint64) {
dataStore = ps
xiduidStore = xuStore
addrs = workerList
uidStore = uStore
instanceIdx = idx
numInstances = numInst
}
func Connect() {
func Connect(workerList []string) {
w := new(Worker)
if err := rpc.Register(w); err != nil {
glog.Fatal(err)
......@@ -36,8 +38,13 @@ func Connect() {
if err := runServer(*workerPort); err != nil {
glog.Fatal(err)
}
if uint64(len(workerList)) != numInstances {
glog.WithField("len(list)", len(workerList)).
WithField("numInstances", numInstances).
Fatalf("Wrong number of instances in workerList")
}
for _, addr := range addrs {
for _, addr := range workerList {
if len(addr) == 0 {
continue
}
......@@ -56,16 +63,49 @@ func Connect() {
glog.Info("Server started. Clients connected.")
}
func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(qu)
q := new(task.Query)
q.Init(qu, uo)
attr := string(q.Attr())
idx := farm.Fingerprint64([]byte(attr)) % numInstances
var runHere bool
if attr == "_xid_" || attr == "_uid_" {
idx = 0
runHere = (instanceIdx == 0)
} else {
runHere = (instanceIdx == idx)
}
if runHere {
return ProcessTask(qu)
}
pool := pools[idx]
addr := pool.Addr
query := new(conn.Query)
query.Data = qu
reply := new(conn.Reply)
if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
glog.WithField("call", "Worker.ServeTask").Fatal(err)
}
glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
Info("Got reply from server")
return reply.Data, nil
}
func ProcessTask(query []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(query)
q := new(task.Query)
q.Init(query, uo)
attr := string(q.Attr())
b := flatbuffers.NewBuilder(0)
voffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
attr := string(q.Attr())
for i := 0; i < q.UidsLength(); i++ {
uid := q.Uids(i)
key := posting.Key(uid, attr)
......@@ -133,6 +173,22 @@ func (w *Worker) Hello(query *conn.Query, reply *conn.Reply) error {
return nil
}
func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) {
uo := flatbuffers.GetUOffsetT(query.Data)
q := new(task.Query)
q.Init(query.Data, uo)
attr := string(q.Attr())
if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx {
reply.Data, rerr = ProcessTask(query.Data)
} else {
glog.WithField("attribute", attr).
WithField("instanceIdx", instanceIdx).
Fatalf("Request sent to wrong server")
}
return rerr
}
func serveRequests(irwc io.ReadWriteCloser) {
for {
sc := &conn.ServerCodec{
......
......@@ -58,7 +58,7 @@ func TestProcessTask(t *testing.T) {
defer clog.Close()
posting.Init(clog)
Init(ps, nil, nil)
Init(ps, nil, 0, 1)
edge := x.DirectedEdge{
ValueId: 23,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment