diff --git a/query/query.go b/query/query.go index 1524d3cfa28c8118d9c899043126035a1c52feb4..1dfa3791e7355a4e0bfd60c070a96e49bf546c22 100644 --- a/query/query.go +++ b/query/query.go @@ -262,8 +262,8 @@ func treeCopy(gq *gql.GraphQuery, sg *SubGraph) { } } -func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) { - sg, err := newGraph(gq.UID, gq.XID) +func ToSubGraph(gq *gql.GraphQuery, pstore *store.Store) (*SubGraph, error) { + sg, err := newGraph(gq.UID, gq.XID, pstore) if err != nil { return nil, err } @@ -271,11 +271,11 @@ func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) { return sg, nil } -func newGraph(euid uint64, exid string) (*SubGraph, error) { +func newGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) { // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { - u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default + u, err := uid.GetOrAssign(exid, 0, 1, pstore) // instanceIdx = 0, numInstances = 1 by default if err != nil { x.Err(glog, err).WithField("xid", exid).Error( "While GetOrAssign uid from external id") @@ -401,11 +401,11 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { return sorted, nil } -func ProcessGraph(sg *SubGraph, rch chan error) { +func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { var err error if len(sg.query) > 0 && sg.Attr != "_root_" { // This task execution would go over the wire in later versions. - sg.result, err = posting.ProcessTask(sg.query) + sg.result, err = posting.ProcessTask(sg.query, pstore) if err != nil { x.Err(glog, err).Error("While processing task.") rch <- err @@ -444,7 +444,7 @@ func ProcessGraph(sg *SubGraph, rch chan error) { for i := 0; i < len(sg.Children); i++ { child := sg.Children[i] child.query = createTaskQuery(child.Attr, sorted) - go ProcessGraph(child, childchan) + go ProcessGraph(child, childchan, pstore) } // Now get all the results back. diff --git a/query/query_test.go b/query/query_test.go index c11f2b1aadc82909754b429757a1b9fc8856a8a7..3fd0c4023199188b9c6274e1f7fb82cadf8e84a9 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -88,7 +88,16 @@ func checkSingleValue(t *testing.T, child *SubGraph, func TestNewGraph(t *testing.T) { var ex uint64 ex = 101 - sg, err := newGraph(ex, "") + + dir, err := ioutil.TempDir("", "storetest_") + if err != nil { + t.Error(err) + return + } + + ps := new(store.Store) + ps.Init(dir) + sg, err := newGraph(ex, "", ps) if err != nil { t.Error(err) } @@ -111,12 +120,12 @@ func TestNewGraph(t *testing.T) { } } -func populateGraph(t *testing.T) string { +func populateGraph(t *testing.T) (string, *store.Store) { // logrus.SetLevel(logrus.DebugLevel) dir, err := ioutil.TempDir("", "storetest_") if err != nil { t.Error(err) - return "" + return "", nil } ps := new(store.Store) @@ -124,7 +133,7 @@ func populateGraph(t *testing.T) string { clog := commit.NewLogger(dir, "mutations", 50<<20) clog.Init() - posting.Init(ps, clog) + posting.Init(clog) // So, user we're interested in has uid: 1. // She has 4 friends: 23, 24, 25, 31, and 101 @@ -133,48 +142,48 @@ func populateGraph(t *testing.T) string { Source: "testing", Timestamp: time.Now(), } - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps)) edge.ValueId = 24 - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps)) edge.ValueId = 25 - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps)) edge.ValueId = 31 - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps)) edge.ValueId = 101 - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps)) // Now let's add a few properties for the main user. edge.Value = "Michonne" - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name"), ps)) edge.Value = "female" - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender"), ps)) edge.Value = "alive" - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status"), ps)) // Now let's add a name for each of the friends, except 101. edge.Value = "Rick Grimes" - addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name"), ps)) edge.Value = "Glenn Rhee" - addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name"), ps)) edge.Value = "Daryl Dixon" - addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name"), ps)) edge.Value = "Andrea" - addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name"), ps)) - return dir + return dir, ps } func TestProcessGraph(t *testing.T) { - dir := populateGraph(t) + dir, ps := populateGraph(t) defer os.RemoveAll(dir) // Alright. Now we have everything set up. Let's create the query. @@ -194,13 +203,13 @@ func TestProcessGraph(t *testing.T) { if err != nil { t.Error(err) } - sg, err := ToSubGraph(gq) + sg, err := ToSubGraph(gq, ps) if err != nil { t.Error(err) } ch := make(chan error) - go ProcessGraph(sg, ch) + go ProcessGraph(sg, ch, ps) err = <-ch if err != nil { t.Error(err) @@ -265,7 +274,7 @@ func TestProcessGraph(t *testing.T) { } func TestToJson(t *testing.T) { - dir := populateGraph(t) + dir, ps := populateGraph(t) defer os.RemoveAll(dir) // Alright. Now we have everything set up. Let's create the query. @@ -286,13 +295,13 @@ func TestToJson(t *testing.T) { if err != nil { t.Error(err) } - sg, err := ToSubGraph(gq) + sg, err := ToSubGraph(gq, ps) if err != nil { t.Error(err) } ch := make(chan error) - go ProcessGraph(sg, ch) + go ProcessGraph(sg, ch, ps) err = <-ch if err != nil { t.Error(err) diff --git a/server/main.go b/server/main.go index 5085ff9fdef228fa478b87c3cd792cd117d2008f..643764210671baed31df18c532af145abf3c4022 100644 --- a/server/main.go +++ b/server/main.go @@ -51,67 +51,69 @@ func addCorsHeaders(w http.ResponseWriter) { w.Header().Set("Connection", "close") } -func queryHandler(w http.ResponseWriter, r *http.Request) { - addCorsHeaders(w) - if r.Method == "OPTIONS" { - return +func queryHandler(ps *store.Store) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + addCorsHeaders(w) + if r.Method == "OPTIONS" { + return + } + if r.Method != "POST" { + x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method") + return + } + + var l query.Latency + l.Start = time.Now() + defer r.Body.Close() + q, err := ioutil.ReadAll(r.Body) + if err != nil || len(q) == 0 { + x.Err(glog, err).Error("While reading query") + x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.") + return + } + + glog.WithField("q", string(q)).Debug("Query received.") + gq, err := gql.Parse(string(q)) + if err != nil { + x.Err(glog, err).Error("While parsing query") + x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) + return + } + sg, err := query.ToSubGraph(gq, ps) + if err != nil { + x.Err(glog, err).Error("While conversion to internal format") + x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) + return + } + l.Parsing = time.Since(l.Start) + glog.WithField("q", string(q)).Debug("Query parsed.") + + rch := make(chan error) + go query.ProcessGraph(sg, rch, ps) + err = <-rch + if err != nil { + x.Err(glog, err).Error("While executing query") + x.SetStatus(w, x.E_ERROR, err.Error()) + return + } + l.Processing = time.Since(l.Start) - l.Parsing + glog.WithField("q", string(q)).Debug("Graph processed.") + js, err := sg.ToJson(&l) + if err != nil { + x.Err(glog, err).Error("While converting to Json.") + x.SetStatus(w, x.E_ERROR, err.Error()) + return + } + glog.WithFields(logrus.Fields{ + "total": time.Since(l.Start), + "parsing": l.Parsing, + "process": l.Processing, + "json": l.Json, + }).Info("Query Latencies") + + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, string(js)) } - if r.Method != "POST" { - x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method") - return - } - - var l query.Latency - l.Start = time.Now() - defer r.Body.Close() - q, err := ioutil.ReadAll(r.Body) - if err != nil || len(q) == 0 { - x.Err(glog, err).Error("While reading query") - x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.") - return - } - - glog.WithField("q", string(q)).Debug("Query received.") - gq, err := gql.Parse(string(q)) - if err != nil { - x.Err(glog, err).Error("While parsing query") - x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) - return - } - sg, err := query.ToSubGraph(gq) - if err != nil { - x.Err(glog, err).Error("While conversion to internal format") - x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) - return - } - l.Parsing = time.Since(l.Start) - glog.WithField("q", string(q)).Debug("Query parsed.") - - rch := make(chan error) - go query.ProcessGraph(sg, rch) - err = <-rch - if err != nil { - x.Err(glog, err).Error("While executing query") - x.SetStatus(w, x.E_ERROR, err.Error()) - return - } - l.Processing = time.Since(l.Start) - l.Parsing - glog.WithField("q", string(q)).Debug("Graph processed.") - js, err := sg.ToJson(&l) - if err != nil { - x.Err(glog, err).Error("While converting to Json.") - x.SetStatus(w, x.E_ERROR, err.Error()) - return - } - glog.WithFields(logrus.Fields{ - "total": time.Since(l.Start), - "parsing": l.Parsing, - "process": l.Processing, - "json": l.Json, - }).Info("Query Latencies") - - w.Header().Set("Content-Type", "application/json") - fmt.Fprint(w, string(js)) } func main() { @@ -135,9 +137,9 @@ func main() { clog.Init() defer clog.Close() - posting.Init(ps, clog) + posting.Init(clog) - http.HandleFunc("/query", queryHandler) + http.HandleFunc("/query", queryHandler(ps)) glog.WithField("port", *port).Info("Listening for requests...") if err := http.ListenAndServe(":"+*port, nil); err != nil { x.Err(glog, err).Fatal("ListenAndServe") diff --git a/server/main_test.go b/server/main_test.go index 0c384f160c57d5b934f2ee43921a92b82295f601..65fa66aa0c5c6b9e7d55ee62268b73f450fc6708 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -43,34 +43,34 @@ var q0 = ` } ` -func prepare() (dir1, dir2 string, clog *commit.Logger, rerr error) { +func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr error) { var err error dir1, err = ioutil.TempDir("", "storetest_") if err != nil { - return "", "", nil, err + return "", "", nil, nil, err } - ps := new(store.Store) + ps = new(store.Store) ps.Init(dir1) dir2, err = ioutil.TempDir("", "storemuts_") if err != nil { - return dir1, "", nil, err + return dir1, "", nil, nil, err } clog = commit.NewLogger(dir2, "mutations", 50<<20) clog.Init() - posting.Init(ps, clog) + posting.Init(clog) f, err := os.Open("testdata.nq") if err != nil { - return dir1, dir2, clog, err + return dir1, dir2, nil, clog, err } defer f.Close() - _, err = loader.HandleRdfReader(f, 0, 1) + _, err = loader.HandleRdfReader(f, 0, 1, ps, ps) if err != nil { - return dir1, dir2, clog, err + return dir1, dir2, nil, clog, err } - return dir1, dir2, clog, nil + return dir1, dir2, ps, clog, nil } func closeAll(dir1, dir2 string, clog *commit.Logger) { @@ -80,7 +80,7 @@ func closeAll(dir1, dir2 string, clog *commit.Logger) { } func TestQuery(t *testing.T) { - dir1, dir2, clog, err := prepare() + dir1, dir2, ps, clog, err := prepare() if err != nil { t.Error(err) return @@ -93,7 +93,7 @@ func TestQuery(t *testing.T) { t.Error(err) return } - g, err := query.ToSubGraph(gq) + g, err := query.ToSubGraph(gq, ps) if err != nil { t.Error(err) return @@ -137,7 +137,7 @@ func TestQuery(t *testing.T) { } ch := make(chan error) - go query.ProcessGraph(g, ch) + go query.ProcessGraph(g, ch, ps) if err := <-ch; err != nil { t.Error(err) return @@ -175,7 +175,7 @@ var q1 = ` ` func BenchmarkQuery(b *testing.B) { - dir1, dir2, clog, err := prepare() + dir1, dir2, ps, clog, err := prepare() if err != nil { b.Error(err) return @@ -189,14 +189,14 @@ func BenchmarkQuery(b *testing.B) { b.Error(err) return } - g, err := query.ToSubGraph(gq) + g, err := query.ToSubGraph(gq, ps) if err != nil { b.Error(err) return } ch := make(chan error) - go query.ProcessGraph(g, ch) + go query.ProcessGraph(g, ch, ps) if err := <-ch; err != nil { b.Error(err) return