Skip to content
Snippets Groups Projects
Commit 7ffebae5 authored by Manish R Jain's avatar Manish R Jain
Browse files

Welcome to a breathing ToJson. Dgraph has taken it's first small step to mankind.

parent ecdb53ab
No related branches found
No related tags found
No related merge requests found
...@@ -24,7 +24,7 @@ func ProcessTask(query []byte) (result []byte, rerr error) { ...@@ -24,7 +24,7 @@ func ProcessTask(query []byte) (result []byte, rerr error) {
task.ValueStart(b) task.ValueStart(b)
var valoffset flatbuffers.UOffsetT var valoffset flatbuffers.UOffsetT
if val, err := pl.Value(); err != nil { if val, err := pl.Value(); err != nil {
valoffset = b.CreateByteVector(nilbyte) valoffset = b.CreateByteVector(x.Nilbyte)
} else { } else {
valoffset = b.CreateByteVector(val) valoffset = b.CreateByteVector(val)
} }
...@@ -70,10 +70,3 @@ func NewQuery(attr string, uids []uint64) []byte { ...@@ -70,10 +70,3 @@ func NewQuery(attr string, uids []uint64) []byte {
b.Finish(qend) b.Finish(qend)
return b.Bytes[b.Head():] return b.Bytes[b.Head():]
} }
var nilbyte []byte
func init() {
nilbyte = make([]byte, 1)
nilbyte[0] = 0x00
}
...@@ -18,6 +18,7 @@ package query ...@@ -18,6 +18,7 @@ package query
import ( import (
"container/heap" "container/heap"
"encoding/json"
"fmt" "fmt"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
...@@ -94,6 +95,109 @@ type SubGraph struct { ...@@ -94,6 +95,109 @@ type SubGraph struct {
result []byte result []byte
} }
func mergeInterfaces(i1 interface{}, i2 interface{}) interface{} {
return []interface{}{i1, i2}
}
func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) {
result = make(map[uint64]interface{})
// Get results from all children first.
cResult := make(map[uint64]interface{})
for _, child := range g.Children {
m, err := postTraverse(child)
if err != nil {
x.Err(glog, err).Error("Error while traversal")
return result, err
}
// Merge results from all children, one by one.
for k, v := range m {
if val, present := cResult[k]; !present {
cResult[k] = v
} else {
cResult[k] = mergeInterfaces(val, v)
}
}
}
// Now read the query and results at current node.
uo := flatbuffers.GetUOffsetT(g.query)
q := new(task.Query)
q.Init(g.query, uo)
ro := flatbuffers.GetUOffsetT(g.result)
r := new(task.Result)
r.Init(g.result, ro)
if q.UidsLength() != r.UidmatrixLength() {
glog.Fatal("Result uidmatrixlength: %v. Query uidslength: %v",
r.UidmatrixLength(), q.UidsLength())
}
if q.UidsLength() != r.ValuesLength() {
glog.Fatalf("Result valuelength: %v. Query uidslength: %v",
r.ValuesLength(), q.UidsLength())
}
var empty map[string]bool
var ul task.UidList
for i := 0; i < r.UidmatrixLength(); i++ {
if ok := r.Uidmatrix(&ul, i); !ok {
return result, fmt.Errorf("While parsing UidList")
}
l := make([]interface{}, ul.UidsLength())
for j := 0; j < ul.UidsLength(); j++ {
uid := ul.Uids(j)
if ival, present := cResult[uid]; !present {
l[j] = empty
} else {
l[j] = ival
}
}
if len(l) > 0 {
result[q.Uids(i)] = l
}
}
var tv task.Value
for i := 0; i < r.ValuesLength(); i++ {
if ok := r.Values(&tv, i); !ok {
return result, fmt.Errorf("While parsing value")
}
if tv.ValLength() == 1 && tv.ValBytes()[0] == 0x00 {
continue
}
var ival interface{}
if err := posting.ParseValue(ival, tv.ValBytes()); err != nil {
return result, err
}
if pval, present := result[q.Uids(i)]; present {
glog.WithField("prev", pval).Fatal("Previous value detected.")
}
m := make(map[string]interface{})
m["uid"] = q.Uids(i)
m[g.Attr] = ival
result[q.Uids(i)] = m
}
return result, nil
}
func (g *SubGraph) ToJson() (js []byte, rerr error) {
r, err := postTraverse(g)
if err != nil {
x.Err(glog, err).Error("While doing traversal")
return js, err
}
if len(r) == 1 {
for _, ival := range r {
return json.Marshal(ival)
}
} else {
glog.Fatal("We don't currently support more than 1 uid at root.")
}
return json.Marshal(r)
}
/* /*
func getChildren(r *task.Result, sg *SubGraph) (result interface{}, rerr error) { func getChildren(r *task.Result, sg *SubGraph) (result interface{}, rerr error) {
var l []interface{} var l []interface{}
...@@ -195,18 +299,34 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) { ...@@ -195,18 +299,34 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) {
b := flatbuffers.NewBuilder(0) b := flatbuffers.NewBuilder(0)
omatrix := x.UidlistOffset(b, []uint64{euid}) omatrix := x.UidlistOffset(b, []uint64{euid})
// Also need to add nil value to keep this consistent.
var voffset flatbuffers.UOffsetT
{
task.ValueStart(b)
bvo := b.CreateByteVector(x.Nilbyte)
task.ValueAddVal(b, bvo)
voffset = task.ValueEnd(b)
}
task.ResultStartUidmatrixVector(b, 1) task.ResultStartUidmatrixVector(b, 1)
b.PrependUOffsetT(omatrix) b.PrependUOffsetT(omatrix)
mend := b.EndVector(1) mend := b.EndVector(1)
task.ResultStartValuesVector(b, 1)
b.PrependUOffsetT(voffset)
vend := b.EndVector(1)
task.ResultStart(b) task.ResultStart(b)
task.ResultAddUidmatrix(b, mend) task.ResultAddUidmatrix(b, mend)
task.ResultAddValues(b, vend)
rend := task.ResultEnd(b) rend := task.ResultEnd(b)
b.Finish(rend) b.Finish(rend)
sg := new(SubGraph) sg := new(SubGraph)
sg.Attr = "_root_" sg.Attr = "_root_"
sg.result = b.Bytes[b.Head():] sg.result = b.Bytes[b.Head():]
// Also add query for consistency and to allow for ToJson() later.
sg.query = createTaskQuery(sg.Attr, []uint64{euid})
return sg, nil return sg, nil
} }
...@@ -287,7 +407,7 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { ...@@ -287,7 +407,7 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) {
func ProcessGraph(sg *SubGraph, rch chan error) { func ProcessGraph(sg *SubGraph, rch chan error) {
var err error var err error
if len(sg.query) > 0 { if len(sg.query) > 0 && sg.Attr != "_root_" {
// This task execution would go over the wire in later versions. // This task execution would go over the wire in later versions.
sg.result, err = posting.ProcessTask(sg.query) sg.result, err = posting.ProcessTask(sg.query)
if err != nil { if err != nil {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package query package query
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
...@@ -343,4 +344,10 @@ func TestProcessGraph(t *testing.T) { ...@@ -343,4 +344,10 @@ func TestProcessGraph(t *testing.T) {
checkSingleValue(t, sg.Children[1], "name", "Michonne") checkSingleValue(t, sg.Children[1], "name", "Michonne")
checkSingleValue(t, sg.Children[2], "gender", "female") checkSingleValue(t, sg.Children[2], "gender", "female")
checkSingleValue(t, sg.Children[3], "status", "alive") checkSingleValue(t, sg.Children[3], "status", "alive")
js, err := sg.ToJson()
if err != nil {
t.Error(err)
}
fmt.Printf(string(js))
} }
...@@ -90,3 +90,10 @@ func UidlistOffset(b *flatbuffers.Builder, ...@@ -90,3 +90,10 @@ func UidlistOffset(b *flatbuffers.Builder,
task.UidListAddUids(b, ulist) task.UidListAddUids(b, ulist)
return task.UidListEnd(b) return task.UidListEnd(b)
} }
var Nilbyte []byte
func init() {
Nilbyte = make([]byte, 1)
Nilbyte[0] = 0x00
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment