From 74db725d7452828393d8f493ee149f3d9af2714c Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Fri, 23 Oct 2015 11:49:12 +1100
Subject: [PATCH] Code for processing SubGraph. Testing needed.

---
 posting/worker.go      |  2 +-
 posting/worker_test.go |  4 +--
 query/query.go         | 65 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 68 insertions(+), 3 deletions(-)

diff --git a/posting/worker.go b/posting/worker.go
index 04392d9b..56529b78 100644
--- a/posting/worker.go
+++ b/posting/worker.go
@@ -37,7 +37,7 @@ func addUids(b *flatbuffers.Builder, sorted []uint64) flatbuffers.UOffsetT {
 	return b.EndVector(len(sorted))
 }
 
-func ProcessQuery(query []byte) (result []byte, rerr error) {
+func ProcessTask(query []byte) (result []byte, rerr error) {
 	uo := flatbuffers.GetUOffsetT(query)
 	q := new(task.Query)
 	q.Init(query, uo)
diff --git a/posting/worker_test.go b/posting/worker_test.go
index 99fb2391..010afc5f 100644
--- a/posting/worker_test.go
+++ b/posting/worker_test.go
@@ -68,7 +68,7 @@ func addTriple(t *testing.T, triple x.Triple, l *List) {
 	}
 }
 
-func TestProcessQuery(t *testing.T) {
+func TestProcessTask(t *testing.T) {
 	logrus.SetLevel(logrus.DebugLevel)
 
 	pdir := NewStore(t)
@@ -105,7 +105,7 @@ func TestProcessQuery(t *testing.T) {
 	addTriple(t, triple, Get(Key(12, "friend")))
 
 	query := NewQuery("friend", []uint64{10, 11, 12})
-	result, err := ProcessQuery(query)
+	result, err := ProcessTask(query)
 	if err != nil {
 		t.Error(err)
 	}
diff --git a/query/query.go b/query/query.go
index 122d7b39..1cec4f2e 100644
--- a/query/query.go
+++ b/query/query.go
@@ -20,6 +20,7 @@ import (
 	"fmt"
 
 	"github.com/google/flatbuffers/go"
+	"github.com/manishrjain/dgraph/posting"
 	"github.com/manishrjain/dgraph/task"
 	"github.com/manishrjain/dgraph/uid"
 	"github.com/manishrjain/dgraph/x"
@@ -125,3 +126,67 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) {
 	sg.result = b.Bytes[b.Head():]
 	return sg, nil
 }
+
+func createTaskQuery(attr string, r *task.Result) []byte {
+	b := flatbuffers.NewBuilder(0)
+	ao := b.CreateString(attr)
+
+	task.QueryStartUidsVector(b, r.UidsLength())
+	for i := r.UidsLength() - 1; i >= 0; i-- {
+		uid := r.Uids(i)
+		b.PrependUint64(uid)
+	}
+	vend := b.EndVector(r.UidsLength())
+
+	task.QueryStart(b)
+	task.QueryAddAttr(b, ao)
+	task.QueryAddUids(b, vend)
+	qend := task.QueryEnd(b)
+	b.Finish(qend)
+	return b.Bytes[b.Head():]
+}
+
+func ProcessGraph(sg *SubGraph, rch chan error) {
+	var err error
+	if len(sg.query) > 0 {
+		// This task execution would go over the wire in later versions.
+		sg.result, err = posting.ProcessTask(sg.query)
+		if err != nil {
+			rch <- err
+			return
+		}
+	}
+
+	uo := flatbuffers.GetUOffsetT(sg.result)
+	r := new(task.Result)
+	r.Init(sg.result, uo)
+	if r.UidsLength() == 0 {
+		// Looks like we're done here.
+		if len(sg.Children) > 0 {
+			log.Debug("Have some children but no results. Life got cut short early.")
+		}
+		rch <- nil
+		return
+	}
+
+	// Let's execute it in a tree fashion. Each SubGraph would break off
+	// as many goroutines as it's children; which would then recursively
+	// do the same thing.
+	// Buffered channel to ensure no-blockage.
+	childchan := make(chan error, len(sg.Children))
+	for i := 0; i < len(sg.Children); i++ {
+		child := sg.Children[i]
+		child.query = createTaskQuery(child.Attr, r)
+		go ProcessGraph(child, childchan)
+	}
+
+	// Now get all the results back.
+	for i := 0; i < len(sg.Children); i++ {
+		err = <-childchan
+		if err != nil {
+			rch <- err
+			return
+		}
+	}
+	rch <- nil
+}
-- 
GitLab