Skip to content
Snippets Groups Projects
Commit 8bc03c2c authored by Ashwin's avatar Ashwin
Browse files

added support for distributed query through network calls

parent 3e917ef0
No related branches found
No related tags found
No related merge requests found
:q 0 → 100644
worker/worker.go|30 col 62| : expected type, found ')'
worker/worker.go|31 col 12| : expected ';', found '='
worker/worker.go|39 col 2| : expected declaration, found 'if'
worker/worker.go|48 col 2| : expected declaration, found 'for'
worker/worker.go|107 col 4| : expected declaration, found 'if'
......@@ -103,7 +103,7 @@ func TestNewGraph(t *testing.T) {
t.Error(err)
}
worker.Init(ps, nil)
worker.Init(ps, nil, 0, 1)
uo := flatbuffers.GetUOffsetT(sg.result)
r := new(task.Result)
......@@ -134,7 +134,7 @@ func populateGraph(t *testing.T) (string, *store.Store) {
ps := new(store.Store)
ps.Init(dir)
worker.Init(ps, nil)
worker.Init(ps, nil, 0, 1)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
......
......@@ -147,10 +147,10 @@ func main() {
xiduidStore := new(store.Store)
xiduidStore.Init(*xiduidDir)
defer xiduidStore.Close()
worker.Init(ps, xiduidStore) //Only server instance 0 will have xiduidStore
worker.Init(ps, xiduidStore, *instanceIdx, *numInstances) //Only server instance 0 will have xiduidStore
uid.Init(xiduidStore)
} else {
worker.Init(ps, nil)
worker.Init(ps, nil, *instanceIdx, *numInstances)
}
worker.Connect()
......
......@@ -62,7 +62,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er
clog.Init()
posting.Init(clog)
worker.Init(ps, nil)
worker.Init(ps, nil, 0, 1)
uid.Init(ps)
loader.Init(ps, ps)
......
......@@ -12,6 +12,7 @@ import (
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-farm"
"github.com/google/flatbuffers/go"
)
......@@ -23,10 +24,14 @@ var workerPort = flag.String("workerport", ":12345",
var glog = x.Log("worker")
var dataStore, xiduidStore *store.Store
var pools []*conn.Pool
var addrs = strings.Split(*workers, ",")
var numInstances, instanceIdx uint64
func Init(ps, xuStore *store.Store) {
func Init(ps, xuStore *store.Store, idx, numInst uint64) {
dataStore = ps
xiduidStore = xuStore
numInstances = numInst
instanceIdx = idx
}
func Connect() {
......@@ -63,6 +68,26 @@ func Connect() {
glog.Info("Server started. Clients connected.")
}
// TODO:The format of worker IP input has to be discussed
func ProcessTaskOverNetwork(qu []byte, idx uint64) (result []byte, rerr error) {
pool := pools[idx]
addr := addrs[idx]
client, err := pool.Get()
if err != nil {
glog.Fatal(err)
}
query := new(conn.Query)
query.Data = qu
reply := new(conn.Reply)
if err = client.Call("Worker.ServeTask", query, reply); err != nil {
glog.WithField("call", "Worker.ServeTask").Fatal(err)
}
glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
Info("Got reply from server")
return reply.Data, nil
}
func ProcessTask(query []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(query)
q := new(task.Query)
......@@ -73,42 +98,47 @@ func ProcessTask(query []byte) (result []byte, rerr error) {
uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
attr := string(q.Attr())
for i := 0; i < q.UidsLength(); i++ {
uid := q.Uids(i)
key := posting.Key(uid, attr)
pl := posting.GetOrCreate(key, dataStore)
var valoffset flatbuffers.UOffsetT
if val, err := pl.Value(); err != nil {
valoffset = b.CreateByteVector(x.Nilbyte)
} else {
valoffset = b.CreateByteVector(val)
}
task.ValueStart(b)
task.ValueAddVal(b, valoffset)
voffsets[i] = task.ValueEnd(b)
if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx {
for i := 0; i < q.UidsLength(); i++ {
uid := q.Uids(i)
key := posting.Key(uid, attr)
pl := posting.GetOrCreate(key, dataStore)
var valoffset flatbuffers.UOffsetT
if val, err := pl.Value(); err != nil {
valoffset = b.CreateByteVector(x.Nilbyte)
} else {
valoffset = b.CreateByteVector(val)
}
task.ValueStart(b)
task.ValueAddVal(b, valoffset)
voffsets[i] = task.ValueEnd(b)
ulist := pl.GetUids()
uoffsets[i] = x.UidlistOffset(b, ulist)
}
task.ResultStartValuesVector(b, len(voffsets))
for i := len(voffsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(voffsets[i])
}
valuesVent := b.EndVector(len(voffsets))
ulist := pl.GetUids()
uoffsets[i] = x.UidlistOffset(b, ulist)
}
task.ResultStartValuesVector(b, len(voffsets))
for i := len(voffsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(voffsets[i])
}
valuesVent := b.EndVector(len(voffsets))
task.ResultStartUidmatrixVector(b, len(uoffsets))
for i := len(uoffsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(uoffsets[i])
task.ResultStartUidmatrixVector(b, len(uoffsets))
for i := len(uoffsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(uoffsets[i])
}
matrixVent := b.EndVector(len(uoffsets))
task.ResultStart(b)
task.ResultAddValues(b, valuesVent)
task.ResultAddUidmatrix(b, matrixVent)
rend := task.ResultEnd(b)
b.Finish(rend)
return b.Bytes[b.Head():], nil
} else {
return ProcessTaskOverNetwork(query,
farm.Fingerprint64([]byte(attr))%numInstances)
}
matrixVent := b.EndVector(len(uoffsets))
task.ResultStart(b)
task.ResultAddValues(b, valuesVent)
task.ResultAddUidmatrix(b, matrixVent)
rend := task.ResultEnd(b)
b.Finish(rend)
return b.Bytes[b.Head():], nil
}
func NewQuery(attr string, uids []uint64) []byte {
......@@ -140,6 +170,11 @@ func (w *Worker) Hello(query *conn.Query, reply *conn.Reply) error {
return nil
}
func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) {
reply.Data, rerr = ProcessTask(query.Data)
return rerr
}
func serveRequests(irwc io.ReadWriteCloser) {
for {
sc := &conn.ServerCodec{
......
......@@ -58,7 +58,7 @@ func TestProcessTask(t *testing.T) {
defer clog.Close()
posting.Init(clog)
Init(ps, nil)
Init(ps, nil, 0, 1)
edge := x.DirectedEdge{
ValueId: 23,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment