From 7880b90d5613bccab644fb73c3754eaba3ebabb3 Mon Sep 17 00:00:00 2001 From: Ashwin <ashwin2007ray@gmail.com> Date: Sat, 13 Feb 2016 05:37:16 +1100 Subject: [PATCH] Moved worker.go to a separate module worker --- query/query.go | 3 ++- query/query_test.go | 5 +++++ server/main.go | 2 ++ server/main_test.go | 2 ++ {posting => worker}/worker.go | 15 +++++++++++---- {posting => worker}/worker_test.go | 30 ++++++++++++++++-------------- 6 files changed, 38 insertions(+), 19 deletions(-) rename {posting => worker}/worker.go (86%) rename {posting => worker}/worker_test.go (73%) diff --git a/query/query.go b/query/query.go index 1dfa3791..e176db93 100644 --- a/query/query.go +++ b/query/query.go @@ -29,6 +29,7 @@ import ( "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/uid" + "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) @@ -405,7 +406,7 @@ func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { var err error if len(sg.query) > 0 && sg.Attr != "_root_" { // This task execution would go over the wire in later versions. - sg.result, err = posting.ProcessTask(sg.query, pstore) + sg.result, err = worker.ProcessTask(sg.query) if err != nil { x.Err(glog, err).Error("While processing task.") rch <- err diff --git a/query/query_test.go b/query/query_test.go index 3fd0c402..5e732754 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -28,6 +28,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" + "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) @@ -102,6 +103,8 @@ func TestNewGraph(t *testing.T) { t.Error(err) } + worker.Init(ps) + uo := flatbuffers.GetUOffsetT(sg.result) r := new(task.Result) r.Init(sg.result, uo) @@ -131,6 +134,8 @@ func populateGraph(t *testing.T) (string, *store.Store) { ps := new(store.Store) ps.Init(dir) + worker.Init(ps) + clog := commit.NewLogger(dir, "mutations", 50<<20) clog.Init() posting.Init(clog) diff --git a/server/main.go b/server/main.go index 64376421..71881b74 100644 --- a/server/main.go +++ b/server/main.go @@ -30,6 +30,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" ) @@ -138,6 +139,7 @@ func main() { defer clog.Close() posting.Init(clog) + worker.Init(ps) http.HandleFunc("/query", queryHandler(ps)) glog.WithField("port", *port).Info("Listening for requests...") diff --git a/server/main_test.go b/server/main_test.go index 65fa66aa..684159fc 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -28,6 +28,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/worker" ) var q0 = ` @@ -60,6 +61,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er clog.Init() posting.Init(clog) + worker.Init(ps) f, err := os.Open("testdata.nq") if err != nil { diff --git a/posting/worker.go b/worker/worker.go similarity index 86% rename from posting/worker.go rename to worker/worker.go index 154028e3..e654b7c8 100644 --- a/posting/worker.go +++ b/worker/worker.go @@ -1,13 +1,20 @@ -package posting +package worker import ( + "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) -func ProcessTask(query []byte, pstore *store.Store) (result []byte, rerr error) { +var pstore *store.Store + +func Init(ps *store.Store) { + pstore = ps +} + +func ProcessTask(query []byte) (result []byte, rerr error) { uo := flatbuffers.GetUOffsetT(query) q := new(task.Query) q.Init(query, uo) @@ -19,8 +26,8 @@ func ProcessTask(query []byte, pstore *store.Store) (result []byte, rerr error) attr := string(q.Attr()) for i := 0; i < q.UidsLength(); i++ { uid := q.Uids(i) - key := Key(uid, attr) - pl := GetOrCreate(key, pstore) + key := posting.Key(uid, attr) + pl := posting.GetOrCreate(key, pstore) var valoffset flatbuffers.UOffsetT if val, err := pl.Value(); err != nil { diff --git a/posting/worker_test.go b/worker/worker_test.go similarity index 73% rename from posting/worker_test.go rename to worker/worker_test.go index 65baa209..06f4be87 100644 --- a/posting/worker_test.go +++ b/worker/worker_test.go @@ -1,4 +1,4 @@ -package posting +package worker import ( "fmt" @@ -8,14 +8,15 @@ import ( "time" "github.com/dgraph-io/dgraph/commit" + "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) -func addEdge(t *testing.T, edge x.DirectedEdge, l *List) { - if err := l.AddMutation(edge, Set); err != nil { +func addEdge(t *testing.T, edge x.DirectedEdge, l *posting.List) { + if err := l.AddMutation(edge, posting.Set); err != nil { t.Error(err) } } @@ -56,32 +57,33 @@ func TestProcessTask(t *testing.T) { clog.Init() defer clog.Close() - Init(clog) + posting.Init(clog) + Init(ps) edge := x.DirectedEdge{ ValueId: 23, Source: "author0", Timestamp: time.Now(), } - addEdge(t, edge, GetOrCreate(Key(10, "friend"), ps)) - addEdge(t, edge, GetOrCreate(Key(11, "friend"), ps)) - addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(10, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(11, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.ValueId = 25 - addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.ValueId = 26 - addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.ValueId = 31 - addEdge(t, edge, GetOrCreate(Key(10, "friend"), ps)) - addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(10, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.Value = "photon" - addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) query := NewQuery("friend", []uint64{10, 11, 12}) - result, err := ProcessTask(query, ps) + result, err := ProcessTask(query) if err != nil { t.Error(err) } @@ -126,7 +128,7 @@ func TestProcessTask(t *testing.T) { t.Errorf("Unable to retrieve value") } var iout interface{} - if err := ParseValue(&iout, tval.ValBytes()); err != nil { + if err := posting.ParseValue(&iout, tval.ValBytes()); err != nil { t.Error(err) } v := iout.(string) -- GitLab