diff --git a/query/query.go b/query/query.go index 1dfa3791e7355a4e0bfd60c070a96e49bf546c22..e176db9305589ae819f52b7e9343c5401c9a582c 100644 --- a/query/query.go +++ b/query/query.go @@ -29,6 +29,7 @@ import ( "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/uid" + "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) @@ -405,7 +406,7 @@ func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { var err error if len(sg.query) > 0 && sg.Attr != "_root_" { // This task execution would go over the wire in later versions. - sg.result, err = posting.ProcessTask(sg.query, pstore) + sg.result, err = worker.ProcessTask(sg.query) if err != nil { x.Err(glog, err).Error("While processing task.") rch <- err diff --git a/query/query_test.go b/query/query_test.go index 3fd0c4023199188b9c6274e1f7fb82cadf8e84a9..5e732754994bcb4e020c08e5b90de92c1a0a1c83 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -28,6 +28,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" + "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) @@ -102,6 +103,8 @@ func TestNewGraph(t *testing.T) { t.Error(err) } + worker.Init(ps) + uo := flatbuffers.GetUOffsetT(sg.result) r := new(task.Result) r.Init(sg.result, uo) @@ -131,6 +134,8 @@ func populateGraph(t *testing.T) (string, *store.Store) { ps := new(store.Store) ps.Init(dir) + worker.Init(ps) + clog := commit.NewLogger(dir, "mutations", 50<<20) clog.Init() posting.Init(clog) diff --git a/server/main.go b/server/main.go index 643764210671baed31df18c532af145abf3c4022..71881b74ab1a4b642bb568ffaae6791b5443e5a0 100644 --- a/server/main.go +++ b/server/main.go @@ -30,6 +30,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" ) @@ -138,6 +139,7 @@ func main() { defer clog.Close() posting.Init(clog) + worker.Init(ps) http.HandleFunc("/query", queryHandler(ps)) glog.WithField("port", *port).Info("Listening for requests...") diff --git a/server/main_test.go b/server/main_test.go index 65fa66aa0c5c6b9e7d55ee62268b73f450fc6708..684159fc3d310ce75b312e820d2d8ae3806540a4 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -28,6 +28,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/worker" ) var q0 = ` @@ -60,6 +61,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er clog.Init() posting.Init(clog) + worker.Init(ps) f, err := os.Open("testdata.nq") if err != nil { diff --git a/posting/worker.go b/worker/worker.go similarity index 86% rename from posting/worker.go rename to worker/worker.go index 154028e37b7e43d6aa36facfe200f9d4d7b65b4a..e654b7c892a128cbf689995019acebbfcae6d73d 100644 --- a/posting/worker.go +++ b/worker/worker.go @@ -1,13 +1,20 @@ -package posting +package worker import ( + "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) -func ProcessTask(query []byte, pstore *store.Store) (result []byte, rerr error) { +var pstore *store.Store + +func Init(ps *store.Store) { + pstore = ps +} + +func ProcessTask(query []byte) (result []byte, rerr error) { uo := flatbuffers.GetUOffsetT(query) q := new(task.Query) q.Init(query, uo) @@ -19,8 +26,8 @@ func ProcessTask(query []byte, pstore *store.Store) (result []byte, rerr error) attr := string(q.Attr()) for i := 0; i < q.UidsLength(); i++ { uid := q.Uids(i) - key := Key(uid, attr) - pl := GetOrCreate(key, pstore) + key := posting.Key(uid, attr) + pl := posting.GetOrCreate(key, pstore) var valoffset flatbuffers.UOffsetT if val, err := pl.Value(); err != nil { diff --git a/posting/worker_test.go b/worker/worker_test.go similarity index 73% rename from posting/worker_test.go rename to worker/worker_test.go index 65baa20982603c3fe28092f9554fbba7df84533c..06f4be87f02727da500464a9b077a47e34b3c8bf 100644 --- a/posting/worker_test.go +++ b/worker/worker_test.go @@ -1,4 +1,4 @@ -package posting +package worker import ( "fmt" @@ -8,14 +8,15 @@ import ( "time" "github.com/dgraph-io/dgraph/commit" + "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) -func addEdge(t *testing.T, edge x.DirectedEdge, l *List) { - if err := l.AddMutation(edge, Set); err != nil { +func addEdge(t *testing.T, edge x.DirectedEdge, l *posting.List) { + if err := l.AddMutation(edge, posting.Set); err != nil { t.Error(err) } } @@ -56,32 +57,33 @@ func TestProcessTask(t *testing.T) { clog.Init() defer clog.Close() - Init(clog) + posting.Init(clog) + Init(ps) edge := x.DirectedEdge{ ValueId: 23, Source: "author0", Timestamp: time.Now(), } - addEdge(t, edge, GetOrCreate(Key(10, "friend"), ps)) - addEdge(t, edge, GetOrCreate(Key(11, "friend"), ps)) - addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(10, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(11, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.ValueId = 25 - addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.ValueId = 26 - addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.ValueId = 31 - addEdge(t, edge, GetOrCreate(Key(10, "friend"), ps)) - addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(10, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.Value = "photon" - addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) query := NewQuery("friend", []uint64{10, 11, 12}) - result, err := ProcessTask(query, ps) + result, err := ProcessTask(query) if err != nil { t.Error(err) } @@ -126,7 +128,7 @@ func TestProcessTask(t *testing.T) { t.Errorf("Unable to retrieve value") } var iout interface{} - if err := ParseValue(&iout, tval.ValBytes()); err != nil { + if err := posting.ParseValue(&iout, tval.ValBytes()); err != nil { t.Error(err) } v := iout.(string)