Skip to content
Snippets Groups Projects
Commit ab956821 authored by Manish R Jain's avatar Manish R Jain
Browse files

Merge pull request #41 from dgraph-io/dmuts

Distributed mutations from DirectedEdges
parents 2fde66ee b6b8bbc9
No related branches found
No related tags found
No related merge requests found
......@@ -15,6 +15,8 @@ type Query struct {
type Reply struct {
Data []byte
// TODO(manishrjain): Add an error here.
// Error string
}
func writeHeader(rwc io.ReadWriteCloser, seq uint64,
......
......@@ -35,7 +35,9 @@ func (c *ServerCodec) ReadRequestBody(data interface{}) error {
return nil
}
func (c *ServerCodec) WriteResponse(resp *rpc.Response, data interface{}) error {
func (c *ServerCodec) WriteResponse(resp *rpc.Response,
data interface{}) error {
if len(resp.Error) > 0 {
log.Fatal("Response has error: " + resp.Error)
}
......
......@@ -17,6 +17,7 @@
package main
import (
"bufio"
"flag"
"fmt"
"io/ioutil"
......@@ -30,6 +31,7 @@ import (
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/worker"
......@@ -59,6 +61,23 @@ func addCorsHeaders(w http.ResponseWriter) {
w.Header().Set("Connection", "close")
}
func mutationHandler(mu *gql.Mutation) error {
r := strings.NewReader(mu.Set)
scanner := bufio.NewScanner(r)
for scanner.Scan() {
ln := strings.Trim(scanner.Text(), " \t")
if len(ln) == 0 {
continue
}
_, err := rdf.Parse(ln)
if err != nil {
glog.WithError(err).Error("While parsing RDF.")
return err
}
}
return nil
}
func queryHandler(w http.ResponseWriter, r *http.Request) {
addCorsHeaders(w)
if r.Method == "OPTIONS" {
......@@ -80,12 +99,14 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
}
glog.WithField("q", string(q)).Debug("Query received.")
gq, _, err := gql.Parse(string(q))
gq, mu, err := gql.Parse(string(q))
if err != nil {
x.Err(glog, err).Error("While parsing query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
mutationHandler(mu)
sg, err := query.ToSubGraph(gq)
if err != nil {
x.Err(glog, err).Error("While conversion to internal format")
......
package worker
import (
"bytes"
"encoding/gob"
"fmt"
"sync"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-farm"
)
type Mutations struct {
Set []x.DirectedEdge
Del []x.DirectedEdge
}
func (m *Mutations) Encode() (data []byte, rerr error) {
var b bytes.Buffer
enc := gob.NewEncoder(&b)
rerr = enc.Encode(*m)
return b.Bytes(), rerr
}
func (m *Mutations) Decode(data []byte) error {
r := bytes.NewReader(data)
dec := gob.NewDecoder(r)
return dec.Decode(m)
}
func mutate(m *Mutations, left *Mutations) error {
// For now, assume it's all only Set instructions.
for _, edge := range m.Set {
if farm.Fingerprint64(
[]byte(edge.Attribute))%numInstances != instanceIdx {
glog.WithField("instanceIdx", instanceIdx).
WithField("attr", edge.Attribute).
Info("Predicate fingerprint doesn't match instanceIdx")
return fmt.Errorf("predicate fingerprint doesn't match this instance.")
}
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, dataStore)
if err := plist.AddMutation(edge, posting.Set); err != nil {
left.Set = append(left.Set, edge)
glog.WithError(err).WithField("edge", edge).
Error("While adding mutation.")
continue
}
}
return nil
}
func runMutate(idx int, m *Mutations, wg *sync.WaitGroup,
replies chan *conn.Reply, che chan error) {
defer wg.Done()
left := new(Mutations)
if idx == int(instanceIdx) {
che <- mutate(m, left)
return
}
var err error
pool := pools[idx]
query := new(conn.Query)
query.Data, err = m.Encode()
if err != nil {
che <- err
return
}
reply := new(conn.Reply)
if err := pool.Call("Worker.Mutate", query, reply); err != nil {
glog.WithField("call", "Worker.Mutate").
WithField("addr", pool.Addr).
WithError(err).Error("While calling mutate")
che <- err
return
}
replies <- reply
}
func MutateOverNetwork(
edges []x.DirectedEdge) (left []x.DirectedEdge, rerr error) {
mutationArray := make([]*Mutations, numInstances)
for _, edge := range edges {
idx := farm.Fingerprint64([]byte(edge.Attribute)) % numInstances
mu := mutationArray[idx]
if mu == nil {
mu = new(Mutations)
}
mu.Set = append(mu.Set, edge)
}
var wg sync.WaitGroup
replies := make(chan *conn.Reply, numInstances)
errors := make(chan error, numInstances)
for idx, mu := range mutationArray {
if mu == nil || len(mu.Set) == 0 {
continue
}
wg.Add(1)
go runMutate(idx, mu, &wg, replies, errors)
}
wg.Wait()
close(replies)
close(errors)
for err := range errors {
if err != nil {
glog.WithError(err).Error("While running all mutations")
return left, err
}
}
for reply := range replies {
l := new(Mutations)
if err := l.Decode(reply.Data); err != nil {
return left, err
}
left = append(left, l.Set...)
}
return left, nil
}
package worker
import (
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-farm"
"github.com/google/flatbuffers/go"
)
func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(qu)
q := new(task.Query)
q.Init(qu, uo)
attr := string(q.Attr())
idx := farm.Fingerprint64([]byte(attr)) % numInstances
var runHere bool
if attr == "_xid_" || attr == "_uid_" {
idx = 0
runHere = (instanceIdx == 0)
} else {
runHere = (instanceIdx == idx)
}
if runHere {
// No need for a network call, as this should be run from within
// this instance.
return processTask(qu)
}
pool := pools[idx]
addr := pool.Addr
query := new(conn.Query)
query.Data = qu
reply := new(conn.Reply)
if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
glog.WithField("call", "Worker.ServeTask").Fatal(err)
}
glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
Info("Got reply from server")
return reply.Data, nil
}
func processTask(query []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(query)
q := new(task.Query)
q.Init(query, uo)
attr := string(q.Attr())
b := flatbuffers.NewBuilder(0)
voffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
for i := 0; i < q.UidsLength(); i++ {
uid := q.Uids(i)
key := posting.Key(uid, attr)
pl := posting.GetOrCreate(key, dataStore)
var valoffset flatbuffers.UOffsetT
if val, err := pl.Value(); err != nil {
valoffset = b.CreateByteVector(x.Nilbyte)
} else {
valoffset = b.CreateByteVector(val)
}
task.ValueStart(b)
task.ValueAddVal(b, valoffset)
voffsets[i] = task.ValueEnd(b)
ulist := pl.GetUids()
uoffsets[i] = x.UidlistOffset(b, ulist)
}
task.ResultStartValuesVector(b, len(voffsets))
for i := len(voffsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(voffsets[i])
}
valuesVent := b.EndVector(len(voffsets))
task.ResultStartUidmatrixVector(b, len(uoffsets))
for i := len(uoffsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(uoffsets[i])
}
matrixVent := b.EndVector(len(uoffsets))
task.ResultStart(b)
task.ResultAddValues(b, valuesVent)
task.ResultAddUidmatrix(b, matrixVent)
rend := task.ResultEnd(b)
b.Finish(rend)
return b.Bytes[b.Head():], nil
}
......@@ -7,7 +7,6 @@ import (
"net/rpc"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
......@@ -63,87 +62,6 @@ func Connect(workerList []string) {
glog.Info("Server started. Clients connected.")
}
func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(qu)
q := new(task.Query)
q.Init(qu, uo)
attr := string(q.Attr())
idx := farm.Fingerprint64([]byte(attr)) % numInstances
var runHere bool
if attr == "_xid_" || attr == "_uid_" {
idx = 0
runHere = (instanceIdx == 0)
} else {
runHere = (instanceIdx == idx)
}
if runHere {
return ProcessTask(qu)
}
pool := pools[idx]
addr := pool.Addr
query := new(conn.Query)
query.Data = qu
reply := new(conn.Reply)
if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
glog.WithField("call", "Worker.ServeTask").Fatal(err)
}
glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
Info("Got reply from server")
return reply.Data, nil
}
func ProcessTask(query []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(query)
q := new(task.Query)
q.Init(query, uo)
attr := string(q.Attr())
b := flatbuffers.NewBuilder(0)
voffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
for i := 0; i < q.UidsLength(); i++ {
uid := q.Uids(i)
key := posting.Key(uid, attr)
pl := posting.GetOrCreate(key, dataStore)
var valoffset flatbuffers.UOffsetT
if val, err := pl.Value(); err != nil {
valoffset = b.CreateByteVector(x.Nilbyte)
} else {
valoffset = b.CreateByteVector(val)
}
task.ValueStart(b)
task.ValueAddVal(b, valoffset)
voffsets[i] = task.ValueEnd(b)
ulist := pl.GetUids()
uoffsets[i] = x.UidlistOffset(b, ulist)
}
task.ResultStartValuesVector(b, len(voffsets))
for i := len(voffsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(voffsets[i])
}
valuesVent := b.EndVector(len(voffsets))
task.ResultStartUidmatrixVector(b, len(uoffsets))
for i := len(uoffsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(uoffsets[i])
}
matrixVent := b.EndVector(len(uoffsets))
task.ResultStart(b)
task.ResultAddValues(b, valuesVent)
task.ResultAddUidmatrix(b, matrixVent)
rend := task.ResultEnd(b)
b.Finish(rend)
return b.Bytes[b.Head():], nil
}
func NewQuery(attr string, uids []uint64) []byte {
b := flatbuffers.NewBuilder(0)
task.QueryStartUidsVector(b, len(uids))
......@@ -173,6 +91,20 @@ func (w *Worker) Hello(query *conn.Query, reply *conn.Reply) error {
return nil
}
func (w *Worker) Mutate(query *conn.Query, reply *conn.Reply) (rerr error) {
m := new(Mutations)
if err := m.Decode(query.Data); err != nil {
return err
}
left := new(Mutations)
if err := mutate(m, left); err != nil {
return err
}
reply.Data, rerr = left.Encode()
return
}
func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) {
uo := flatbuffers.GetUOffsetT(query.Data)
q := new(task.Query)
......@@ -180,7 +112,7 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) {
attr := string(q.Attr())
if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx {
reply.Data, rerr = ProcessTask(query.Data)
reply.Data, rerr = processTask(query.Data)
} else {
glog.WithField("attribute", attr).
WithField("instanceIdx", instanceIdx).
......
......@@ -83,7 +83,7 @@ func TestProcessTask(t *testing.T) {
addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps))
query := NewQuery("friend", []uint64{10, 11, 12})
result, err := ProcessTask(query)
result, err := processTask(query)
if err != nil {
t.Error(err)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment