diff --git a/conn/codec.go b/conn/codec.go index e40864a02892aca7e242b0fca284f196a5975850..08954f2bb0dca91978b70c1f8ca1193fcb7fed76 100644 --- a/conn/codec.go +++ b/conn/codec.go @@ -15,6 +15,8 @@ type Query struct { type Reply struct { Data []byte + // TODO(manishrjain): Add an error here. + // Error string } func writeHeader(rwc io.ReadWriteCloser, seq uint64, diff --git a/conn/server.go b/conn/server.go index 986783a5f49f396496fd4cb4569ecb5f967326ae..e3e7c4dba08316abf6c9a933b21c8751fd5a9147 100644 --- a/conn/server.go +++ b/conn/server.go @@ -35,7 +35,9 @@ func (c *ServerCodec) ReadRequestBody(data interface{}) error { return nil } -func (c *ServerCodec) WriteResponse(resp *rpc.Response, data interface{}) error { +func (c *ServerCodec) WriteResponse(resp *rpc.Response, + data interface{}) error { + if len(resp.Error) > 0 { log.Fatal("Response has error: " + resp.Error) } diff --git a/server/main.go b/server/main.go index 5882f182aad1f81e559e9131d7d61dff2d25b6eb..f71564f5ed86636ee51f05eea320a356d1ff0a58 100644 --- a/server/main.go +++ b/server/main.go @@ -17,6 +17,7 @@ package main import ( + "bufio" "flag" "fmt" "io/ioutil" @@ -30,6 +31,7 @@ import ( "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/query" + "github.com/dgraph-io/dgraph/rdf" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/worker" @@ -59,6 +61,23 @@ func addCorsHeaders(w http.ResponseWriter) { w.Header().Set("Connection", "close") } +func mutationHandler(mu *gql.Mutation) error { + r := strings.NewReader(mu.Set) + scanner := bufio.NewScanner(r) + for scanner.Scan() { + ln := strings.Trim(scanner.Text(), " \t") + if len(ln) == 0 { + continue + } + _, err := rdf.Parse(ln) + if err != nil { + glog.WithError(err).Error("While parsing RDF.") + return err + } + } + return nil +} + func queryHandler(w http.ResponseWriter, r *http.Request) { addCorsHeaders(w) if r.Method == "OPTIONS" { @@ -80,12 +99,14 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { } glog.WithField("q", string(q)).Debug("Query received.") - gq, _, err := gql.Parse(string(q)) + gq, mu, err := gql.Parse(string(q)) if err != nil { x.Err(glog, err).Error("While parsing query") x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) return } + mutationHandler(mu) + sg, err := query.ToSubGraph(gq) if err != nil { x.Err(glog, err).Error("While conversion to internal format") diff --git a/worker/mutation.go b/worker/mutation.go new file mode 100644 index 0000000000000000000000000000000000000000..8b204b78a542e3c62167aa945e4e57a1d2e407c0 --- /dev/null +++ b/worker/mutation.go @@ -0,0 +1,128 @@ +package worker + +import ( + "bytes" + "encoding/gob" + "fmt" + "sync" + + "github.com/dgraph-io/dgraph/conn" + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/x" + "github.com/dgryski/go-farm" +) + +type Mutations struct { + Set []x.DirectedEdge + Del []x.DirectedEdge +} + +func (m *Mutations) Encode() (data []byte, rerr error) { + var b bytes.Buffer + enc := gob.NewEncoder(&b) + rerr = enc.Encode(*m) + return b.Bytes(), rerr +} + +func (m *Mutations) Decode(data []byte) error { + r := bytes.NewReader(data) + dec := gob.NewDecoder(r) + return dec.Decode(m) +} + +func mutate(m *Mutations, left *Mutations) error { + // For now, assume it's all only Set instructions. + for _, edge := range m.Set { + if farm.Fingerprint64( + []byte(edge.Attribute))%numInstances != instanceIdx { + + glog.WithField("instanceIdx", instanceIdx). + WithField("attr", edge.Attribute). + Info("Predicate fingerprint doesn't match instanceIdx") + return fmt.Errorf("predicate fingerprint doesn't match this instance.") + } + + key := posting.Key(edge.Entity, edge.Attribute) + plist := posting.GetOrCreate(key, dataStore) + if err := plist.AddMutation(edge, posting.Set); err != nil { + left.Set = append(left.Set, edge) + glog.WithError(err).WithField("edge", edge). + Error("While adding mutation.") + continue + } + } + return nil +} + +func runMutate(idx int, m *Mutations, wg *sync.WaitGroup, + replies chan *conn.Reply, che chan error) { + + defer wg.Done() + left := new(Mutations) + if idx == int(instanceIdx) { + che <- mutate(m, left) + return + } + + var err error + pool := pools[idx] + query := new(conn.Query) + query.Data, err = m.Encode() + if err != nil { + che <- err + return + } + + reply := new(conn.Reply) + if err := pool.Call("Worker.Mutate", query, reply); err != nil { + glog.WithField("call", "Worker.Mutate"). + WithField("addr", pool.Addr). + WithError(err).Error("While calling mutate") + che <- err + return + } + replies <- reply +} + +func MutateOverNetwork( + edges []x.DirectedEdge) (left []x.DirectedEdge, rerr error) { + + mutationArray := make([]*Mutations, numInstances) + for _, edge := range edges { + idx := farm.Fingerprint64([]byte(edge.Attribute)) % numInstances + mu := mutationArray[idx] + if mu == nil { + mu = new(Mutations) + } + mu.Set = append(mu.Set, edge) + } + + var wg sync.WaitGroup + replies := make(chan *conn.Reply, numInstances) + errors := make(chan error, numInstances) + for idx, mu := range mutationArray { + if mu == nil || len(mu.Set) == 0 { + continue + } + wg.Add(1) + go runMutate(idx, mu, &wg, replies, errors) + } + wg.Wait() + close(replies) + close(errors) + + for err := range errors { + if err != nil { + glog.WithError(err).Error("While running all mutations") + return left, err + } + } + for reply := range replies { + l := new(Mutations) + if err := l.Decode(reply.Data); err != nil { + return left, err + } + left = append(left, l.Set...) + } + return left, nil +} diff --git a/worker/task.go b/worker/task.go new file mode 100644 index 0000000000000000000000000000000000000000..5ce45ff3ee0021810ba9285e29042896ebdef395 --- /dev/null +++ b/worker/task.go @@ -0,0 +1,93 @@ +package worker + +import ( + "github.com/dgraph-io/dgraph/conn" + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/task" + "github.com/dgraph-io/dgraph/x" + "github.com/dgryski/go-farm" + "github.com/google/flatbuffers/go" +) + +func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { + uo := flatbuffers.GetUOffsetT(qu) + q := new(task.Query) + q.Init(qu, uo) + + attr := string(q.Attr()) + idx := farm.Fingerprint64([]byte(attr)) % numInstances + + var runHere bool + if attr == "_xid_" || attr == "_uid_" { + idx = 0 + runHere = (instanceIdx == 0) + } else { + runHere = (instanceIdx == idx) + } + + if runHere { + // No need for a network call, as this should be run from within + // this instance. + return processTask(qu) + } + + pool := pools[idx] + addr := pool.Addr + query := new(conn.Query) + query.Data = qu + reply := new(conn.Reply) + if err := pool.Call("Worker.ServeTask", query, reply); err != nil { + glog.WithField("call", "Worker.ServeTask").Fatal(err) + } + glog.WithField("reply", string(reply.Data)).WithField("addr", addr). + Info("Got reply from server") + return reply.Data, nil +} + +func processTask(query []byte) (result []byte, rerr error) { + uo := flatbuffers.GetUOffsetT(query) + q := new(task.Query) + q.Init(query, uo) + attr := string(q.Attr()) + + b := flatbuffers.NewBuilder(0) + voffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) + uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) + + for i := 0; i < q.UidsLength(); i++ { + uid := q.Uids(i) + key := posting.Key(uid, attr) + pl := posting.GetOrCreate(key, dataStore) + + var valoffset flatbuffers.UOffsetT + if val, err := pl.Value(); err != nil { + valoffset = b.CreateByteVector(x.Nilbyte) + } else { + valoffset = b.CreateByteVector(val) + } + task.ValueStart(b) + task.ValueAddVal(b, valoffset) + voffsets[i] = task.ValueEnd(b) + + ulist := pl.GetUids() + uoffsets[i] = x.UidlistOffset(b, ulist) + } + task.ResultStartValuesVector(b, len(voffsets)) + for i := len(voffsets) - 1; i >= 0; i-- { + b.PrependUOffsetT(voffsets[i]) + } + valuesVent := b.EndVector(len(voffsets)) + + task.ResultStartUidmatrixVector(b, len(uoffsets)) + for i := len(uoffsets) - 1; i >= 0; i-- { + b.PrependUOffsetT(uoffsets[i]) + } + matrixVent := b.EndVector(len(uoffsets)) + + task.ResultStart(b) + task.ResultAddValues(b, valuesVent) + task.ResultAddUidmatrix(b, matrixVent) + rend := task.ResultEnd(b) + b.Finish(rend) + return b.Bytes[b.Head():], nil +} diff --git a/worker/worker.go b/worker/worker.go index 1ed369419aa937eee63558429715b4f00c7b8593..c09afe8abc263ca7a9dadd48570cb63dffffeb55 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -7,7 +7,6 @@ import ( "net/rpc" "github.com/dgraph-io/dgraph/conn" - "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/x" @@ -63,87 +62,6 @@ func Connect(workerList []string) { glog.Info("Server started. Clients connected.") } -func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { - uo := flatbuffers.GetUOffsetT(qu) - q := new(task.Query) - q.Init(qu, uo) - - attr := string(q.Attr()) - idx := farm.Fingerprint64([]byte(attr)) % numInstances - - var runHere bool - if attr == "_xid_" || attr == "_uid_" { - idx = 0 - runHere = (instanceIdx == 0) - } else { - runHere = (instanceIdx == idx) - } - - if runHere { - return ProcessTask(qu) - } - - pool := pools[idx] - addr := pool.Addr - query := new(conn.Query) - query.Data = qu - reply := new(conn.Reply) - if err := pool.Call("Worker.ServeTask", query, reply); err != nil { - glog.WithField("call", "Worker.ServeTask").Fatal(err) - } - glog.WithField("reply", string(reply.Data)).WithField("addr", addr). - Info("Got reply from server") - return reply.Data, nil -} - -func ProcessTask(query []byte) (result []byte, rerr error) { - uo := flatbuffers.GetUOffsetT(query) - q := new(task.Query) - q.Init(query, uo) - attr := string(q.Attr()) - - b := flatbuffers.NewBuilder(0) - voffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) - uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) - - for i := 0; i < q.UidsLength(); i++ { - uid := q.Uids(i) - key := posting.Key(uid, attr) - pl := posting.GetOrCreate(key, dataStore) - - var valoffset flatbuffers.UOffsetT - if val, err := pl.Value(); err != nil { - valoffset = b.CreateByteVector(x.Nilbyte) - } else { - valoffset = b.CreateByteVector(val) - } - task.ValueStart(b) - task.ValueAddVal(b, valoffset) - voffsets[i] = task.ValueEnd(b) - - ulist := pl.GetUids() - uoffsets[i] = x.UidlistOffset(b, ulist) - } - task.ResultStartValuesVector(b, len(voffsets)) - for i := len(voffsets) - 1; i >= 0; i-- { - b.PrependUOffsetT(voffsets[i]) - } - valuesVent := b.EndVector(len(voffsets)) - - task.ResultStartUidmatrixVector(b, len(uoffsets)) - for i := len(uoffsets) - 1; i >= 0; i-- { - b.PrependUOffsetT(uoffsets[i]) - } - matrixVent := b.EndVector(len(uoffsets)) - - task.ResultStart(b) - task.ResultAddValues(b, valuesVent) - task.ResultAddUidmatrix(b, matrixVent) - rend := task.ResultEnd(b) - b.Finish(rend) - return b.Bytes[b.Head():], nil -} - func NewQuery(attr string, uids []uint64) []byte { b := flatbuffers.NewBuilder(0) task.QueryStartUidsVector(b, len(uids)) @@ -173,6 +91,20 @@ func (w *Worker) Hello(query *conn.Query, reply *conn.Reply) error { return nil } +func (w *Worker) Mutate(query *conn.Query, reply *conn.Reply) (rerr error) { + m := new(Mutations) + if err := m.Decode(query.Data); err != nil { + return err + } + + left := new(Mutations) + if err := mutate(m, left); err != nil { + return err + } + reply.Data, rerr = left.Encode() + return +} + func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { uo := flatbuffers.GetUOffsetT(query.Data) q := new(task.Query) @@ -180,7 +112,7 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { attr := string(q.Attr()) if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { - reply.Data, rerr = ProcessTask(query.Data) + reply.Data, rerr = processTask(query.Data) } else { glog.WithField("attribute", attr). WithField("instanceIdx", instanceIdx). diff --git a/worker/worker_test.go b/worker/worker_test.go index 809eb05c876431c5533e50a30f0c200d45cbadfb..e16a6f81ea57b27a654cb30e71d830fd38978ccf 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -83,7 +83,7 @@ func TestProcessTask(t *testing.T) { addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) query := NewQuery("friend", []uint64{10, 11, 12}) - result, err := ProcessTask(query) + result, err := processTask(query) if err != nil { t.Error(err) }