Skip to content
Snippets Groups Projects
Commit 74db725d authored by Manish R Jain's avatar Manish R Jain
Browse files

Code for processing SubGraph. Testing needed.

parent c9fd7838
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,7 @@ func addUids(b *flatbuffers.Builder, sorted []uint64) flatbuffers.UOffsetT {
return b.EndVector(len(sorted))
}
func ProcessQuery(query []byte) (result []byte, rerr error) {
func ProcessTask(query []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(query)
q := new(task.Query)
q.Init(query, uo)
......
......@@ -68,7 +68,7 @@ func addTriple(t *testing.T, triple x.Triple, l *List) {
}
}
func TestProcessQuery(t *testing.T) {
func TestProcessTask(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
pdir := NewStore(t)
......@@ -105,7 +105,7 @@ func TestProcessQuery(t *testing.T) {
addTriple(t, triple, Get(Key(12, "friend")))
query := NewQuery("friend", []uint64{10, 11, 12})
result, err := ProcessQuery(query)
result, err := ProcessTask(query)
if err != nil {
t.Error(err)
}
......
......@@ -20,6 +20,7 @@ import (
"fmt"
"github.com/google/flatbuffers/go"
"github.com/manishrjain/dgraph/posting"
"github.com/manishrjain/dgraph/task"
"github.com/manishrjain/dgraph/uid"
"github.com/manishrjain/dgraph/x"
......@@ -125,3 +126,67 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) {
sg.result = b.Bytes[b.Head():]
return sg, nil
}
func createTaskQuery(attr string, r *task.Result) []byte {
b := flatbuffers.NewBuilder(0)
ao := b.CreateString(attr)
task.QueryStartUidsVector(b, r.UidsLength())
for i := r.UidsLength() - 1; i >= 0; i-- {
uid := r.Uids(i)
b.PrependUint64(uid)
}
vend := b.EndVector(r.UidsLength())
task.QueryStart(b)
task.QueryAddAttr(b, ao)
task.QueryAddUids(b, vend)
qend := task.QueryEnd(b)
b.Finish(qend)
return b.Bytes[b.Head():]
}
func ProcessGraph(sg *SubGraph, rch chan error) {
var err error
if len(sg.query) > 0 {
// This task execution would go over the wire in later versions.
sg.result, err = posting.ProcessTask(sg.query)
if err != nil {
rch <- err
return
}
}
uo := flatbuffers.GetUOffsetT(sg.result)
r := new(task.Result)
r.Init(sg.result, uo)
if r.UidsLength() == 0 {
// Looks like we're done here.
if len(sg.Children) > 0 {
log.Debug("Have some children but no results. Life got cut short early.")
}
rch <- nil
return
}
// Let's execute it in a tree fashion. Each SubGraph would break off
// as many goroutines as it's children; which would then recursively
// do the same thing.
// Buffered channel to ensure no-blockage.
childchan := make(chan error, len(sg.Children))
for i := 0; i < len(sg.Children); i++ {
child := sg.Children[i]
child.query = createTaskQuery(child.Attr, r)
go ProcessGraph(child, childchan)
}
// Now get all the results back.
for i := 0; i < len(sg.Children); i++ {
err = <-childchan
if err != nil {
rch <- err
return
}
}
rch <- nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment