diff --git a/posting/worker.go b/posting/worker.go index 04392d9b1a09f4124975139cb96deae4372ade5c..56529b78301bb63b0da3e8524adcfeaff4bda551 100644 --- a/posting/worker.go +++ b/posting/worker.go @@ -37,7 +37,7 @@ func addUids(b *flatbuffers.Builder, sorted []uint64) flatbuffers.UOffsetT { return b.EndVector(len(sorted)) } -func ProcessQuery(query []byte) (result []byte, rerr error) { +func ProcessTask(query []byte) (result []byte, rerr error) { uo := flatbuffers.GetUOffsetT(query) q := new(task.Query) q.Init(query, uo) diff --git a/posting/worker_test.go b/posting/worker_test.go index 99fb2391bccc168b1d39578e53aba3d72db89bc2..010afc5f981c10f138d63ae565b4680aa5dd9a19 100644 --- a/posting/worker_test.go +++ b/posting/worker_test.go @@ -68,7 +68,7 @@ func addTriple(t *testing.T, triple x.Triple, l *List) { } } -func TestProcessQuery(t *testing.T) { +func TestProcessTask(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) pdir := NewStore(t) @@ -105,7 +105,7 @@ func TestProcessQuery(t *testing.T) { addTriple(t, triple, Get(Key(12, "friend"))) query := NewQuery("friend", []uint64{10, 11, 12}) - result, err := ProcessQuery(query) + result, err := ProcessTask(query) if err != nil { t.Error(err) } diff --git a/query/query.go b/query/query.go index 122d7b396a7ad942a99e9188985fa19e9d7c9d05..1cec4f2e30091152a79e88ebe0b09f6d89113fcf 100644 --- a/query/query.go +++ b/query/query.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/google/flatbuffers/go" + "github.com/manishrjain/dgraph/posting" "github.com/manishrjain/dgraph/task" "github.com/manishrjain/dgraph/uid" "github.com/manishrjain/dgraph/x" @@ -125,3 +126,67 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) { sg.result = b.Bytes[b.Head():] return sg, nil } + +func createTaskQuery(attr string, r *task.Result) []byte { + b := flatbuffers.NewBuilder(0) + ao := b.CreateString(attr) + + task.QueryStartUidsVector(b, r.UidsLength()) + for i := r.UidsLength() - 1; i >= 0; i-- { + uid := r.Uids(i) + b.PrependUint64(uid) + } + vend := b.EndVector(r.UidsLength()) + + task.QueryStart(b) + task.QueryAddAttr(b, ao) + task.QueryAddUids(b, vend) + qend := task.QueryEnd(b) + b.Finish(qend) + return b.Bytes[b.Head():] +} + +func ProcessGraph(sg *SubGraph, rch chan error) { + var err error + if len(sg.query) > 0 { + // This task execution would go over the wire in later versions. + sg.result, err = posting.ProcessTask(sg.query) + if err != nil { + rch <- err + return + } + } + + uo := flatbuffers.GetUOffsetT(sg.result) + r := new(task.Result) + r.Init(sg.result, uo) + if r.UidsLength() == 0 { + // Looks like we're done here. + if len(sg.Children) > 0 { + log.Debug("Have some children but no results. Life got cut short early.") + } + rch <- nil + return + } + + // Let's execute it in a tree fashion. Each SubGraph would break off + // as many goroutines as it's children; which would then recursively + // do the same thing. + // Buffered channel to ensure no-blockage. + childchan := make(chan error, len(sg.Children)) + for i := 0; i < len(sg.Children); i++ { + child := sg.Children[i] + child.query = createTaskQuery(child.Attr, r) + go ProcessGraph(child, childchan) + } + + // Now get all the results back. + for i := 0; i < len(sg.Children); i++ { + err = <-childchan + if err != nil { + rch <- err + return + } + } + rch <- nil +}