From 7ffebae568e488121a4b98db6ac3c70dcfe3f2ce Mon Sep 17 00:00:00 2001 From: Manish R Jain <manishrjain@gmail.com> Date: Fri, 6 Nov 2015 13:09:21 +1100 Subject: [PATCH] Welcome to a breathing ToJson. Dgraph has taken it's first small step to mankind. --- posting/worker.go | 9 +--- query/query.go | 122 +++++++++++++++++++++++++++++++++++++++++++- query/query_test.go | 7 +++ x/x.go | 7 +++ 4 files changed, 136 insertions(+), 9 deletions(-) diff --git a/posting/worker.go b/posting/worker.go index 8f39f2c1..87377951 100644 --- a/posting/worker.go +++ b/posting/worker.go @@ -24,7 +24,7 @@ func ProcessTask(query []byte) (result []byte, rerr error) { task.ValueStart(b) var valoffset flatbuffers.UOffsetT if val, err := pl.Value(); err != nil { - valoffset = b.CreateByteVector(nilbyte) + valoffset = b.CreateByteVector(x.Nilbyte) } else { valoffset = b.CreateByteVector(val) } @@ -70,10 +70,3 @@ func NewQuery(attr string, uids []uint64) []byte { b.Finish(qend) return b.Bytes[b.Head():] } - -var nilbyte []byte - -func init() { - nilbyte = make([]byte, 1) - nilbyte[0] = 0x00 -} diff --git a/query/query.go b/query/query.go index 4aff167d..ce413602 100644 --- a/query/query.go +++ b/query/query.go @@ -18,6 +18,7 @@ package query import ( "container/heap" + "encoding/json" "fmt" "github.com/Sirupsen/logrus" @@ -94,6 +95,109 @@ type SubGraph struct { result []byte } +func mergeInterfaces(i1 interface{}, i2 interface{}) interface{} { + return []interface{}{i1, i2} +} + +func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) { + result = make(map[uint64]interface{}) + // Get results from all children first. + cResult := make(map[uint64]interface{}) + + for _, child := range g.Children { + m, err := postTraverse(child) + if err != nil { + x.Err(glog, err).Error("Error while traversal") + return result, err + } + // Merge results from all children, one by one. + for k, v := range m { + if val, present := cResult[k]; !present { + cResult[k] = v + } else { + cResult[k] = mergeInterfaces(val, v) + } + } + } + + // Now read the query and results at current node. + uo := flatbuffers.GetUOffsetT(g.query) + q := new(task.Query) + q.Init(g.query, uo) + + ro := flatbuffers.GetUOffsetT(g.result) + r := new(task.Result) + r.Init(g.result, ro) + + if q.UidsLength() != r.UidmatrixLength() { + glog.Fatal("Result uidmatrixlength: %v. Query uidslength: %v", + r.UidmatrixLength(), q.UidsLength()) + } + if q.UidsLength() != r.ValuesLength() { + glog.Fatalf("Result valuelength: %v. Query uidslength: %v", + r.ValuesLength(), q.UidsLength()) + } + + var empty map[string]bool + var ul task.UidList + for i := 0; i < r.UidmatrixLength(); i++ { + if ok := r.Uidmatrix(&ul, i); !ok { + return result, fmt.Errorf("While parsing UidList") + } + l := make([]interface{}, ul.UidsLength()) + for j := 0; j < ul.UidsLength(); j++ { + uid := ul.Uids(j) + if ival, present := cResult[uid]; !present { + l[j] = empty + } else { + l[j] = ival + } + } + if len(l) > 0 { + result[q.Uids(i)] = l + } + } + + var tv task.Value + for i := 0; i < r.ValuesLength(); i++ { + if ok := r.Values(&tv, i); !ok { + return result, fmt.Errorf("While parsing value") + } + if tv.ValLength() == 1 && tv.ValBytes()[0] == 0x00 { + continue + } + var ival interface{} + if err := posting.ParseValue(ival, tv.ValBytes()); err != nil { + return result, err + } + if pval, present := result[q.Uids(i)]; present { + glog.WithField("prev", pval).Fatal("Previous value detected.") + } + m := make(map[string]interface{}) + m["uid"] = q.Uids(i) + m[g.Attr] = ival + result[q.Uids(i)] = m + } + return result, nil +} + +func (g *SubGraph) ToJson() (js []byte, rerr error) { + r, err := postTraverse(g) + if err != nil { + x.Err(glog, err).Error("While doing traversal") + return js, err + } + if len(r) == 1 { + for _, ival := range r { + return json.Marshal(ival) + } + } else { + glog.Fatal("We don't currently support more than 1 uid at root.") + } + + return json.Marshal(r) +} + /* func getChildren(r *task.Result, sg *SubGraph) (result interface{}, rerr error) { var l []interface{} @@ -195,18 +299,34 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) { b := flatbuffers.NewBuilder(0) omatrix := x.UidlistOffset(b, []uint64{euid}) + // Also need to add nil value to keep this consistent. + var voffset flatbuffers.UOffsetT + { + task.ValueStart(b) + bvo := b.CreateByteVector(x.Nilbyte) + task.ValueAddVal(b, bvo) + voffset = task.ValueEnd(b) + } + task.ResultStartUidmatrixVector(b, 1) b.PrependUOffsetT(omatrix) mend := b.EndVector(1) + task.ResultStartValuesVector(b, 1) + b.PrependUOffsetT(voffset) + vend := b.EndVector(1) + task.ResultStart(b) task.ResultAddUidmatrix(b, mend) + task.ResultAddValues(b, vend) rend := task.ResultEnd(b) b.Finish(rend) sg := new(SubGraph) sg.Attr = "_root_" sg.result = b.Bytes[b.Head():] + // Also add query for consistency and to allow for ToJson() later. + sg.query = createTaskQuery(sg.Attr, []uint64{euid}) return sg, nil } @@ -287,7 +407,7 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { func ProcessGraph(sg *SubGraph, rch chan error) { var err error - if len(sg.query) > 0 { + if len(sg.query) > 0 && sg.Attr != "_root_" { // This task execution would go over the wire in later versions. sg.result, err = posting.ProcessTask(sg.query) if err != nil { diff --git a/query/query_test.go b/query/query_test.go index 0ee5018a..02535158 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -17,6 +17,7 @@ package query import ( + "fmt" "io/ioutil" "os" "testing" @@ -343,4 +344,10 @@ func TestProcessGraph(t *testing.T) { checkSingleValue(t, sg.Children[1], "name", "Michonne") checkSingleValue(t, sg.Children[2], "gender", "female") checkSingleValue(t, sg.Children[3], "status", "alive") + + js, err := sg.ToJson() + if err != nil { + t.Error(err) + } + fmt.Printf(string(js)) } diff --git a/x/x.go b/x/x.go index 3ab6e3b0..a9e29e48 100644 --- a/x/x.go +++ b/x/x.go @@ -90,3 +90,10 @@ func UidlistOffset(b *flatbuffers.Builder, task.UidListAddUids(b, ulist) return task.UidListEnd(b) } + +var Nilbyte []byte + +func init() { + Nilbyte = make([]byte, 1) + Nilbyte[0] = 0x00 +} -- GitLab