diff --git a/query/query.go b/query/query.go index a2a2d981744dc24c93922691c972fa0dba309ee6..b063d850c1df14a8a0845c59f039682562760c57 100644 --- a/query/query.go +++ b/query/query.go @@ -26,7 +26,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/posting" - "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/worker" @@ -263,8 +262,8 @@ func treeCopy(gq *gql.GraphQuery, sg *SubGraph) { } } -func ToSubGraph(gq *gql.GraphQuery, pstore *store.Store) (*SubGraph, error) { - sg, err := newGraph(gq.UID, gq.XID, pstore) +func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) { + sg, err := newGraph(gq.UID, gq.XID) if err != nil { return nil, err } @@ -272,11 +271,12 @@ func ToSubGraph(gq *gql.GraphQuery, pstore *store.Store) (*SubGraph, error) { return sg, nil } -func newGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) { +func newGraph(euid uint64, exid string) (*SubGraph, error) { // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { - u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default + // instanceIdx = 0, numInstances = 1 by default + u, err := uid.GetOrAssign(exid, 0, 1) if err != nil { x.Err(glog, err).WithField("xid", exid).Error( "While GetOrAssign uid from external id") @@ -402,7 +402,7 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { return sorted, nil } -func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { +func ProcessGraph(sg *SubGraph, rch chan error) { var err error if len(sg.query) > 0 && sg.Attr != "_root_" { // This task execution would go over the wire in later versions. @@ -445,7 +445,7 @@ func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { for i := 0; i < len(sg.Children); i++ { child := sg.Children[i] child.query = createTaskQuery(child.Attr, sorted) - go ProcessGraph(child, childchan, pstore) + go ProcessGraph(child, childchan) } // Now get all the results back. diff --git a/query/query_test.go b/query/query_test.go index 12713917a56d1584f536411b74d0339ef44d8780..00393e47e0cb592c20bb80f72eb9231eeac94fa5 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -98,7 +98,7 @@ func TestNewGraph(t *testing.T) { ps := new(store.Store) ps.Init(dir) - sg, err := newGraph(ex, "", ps) + sg, err := newGraph(ex, "") if err != nil { t.Error(err) } @@ -188,7 +188,7 @@ func populateGraph(t *testing.T) (string, *store.Store) { } func TestProcessGraph(t *testing.T) { - dir, ps := populateGraph(t) + dir, _ := populateGraph(t) defer os.RemoveAll(dir) // Alright. Now we have everything set up. Let's create the query. @@ -208,13 +208,13 @@ func TestProcessGraph(t *testing.T) { if err != nil { t.Error(err) } - sg, err := ToSubGraph(gq, ps) + sg, err := ToSubGraph(gq) if err != nil { t.Error(err) } ch := make(chan error) - go ProcessGraph(sg, ch, ps) + go ProcessGraph(sg, ch) err = <-ch if err != nil { t.Error(err) @@ -279,7 +279,7 @@ func TestProcessGraph(t *testing.T) { } func TestToJson(t *testing.T) { - dir, ps := populateGraph(t) + dir, _ := populateGraph(t) defer os.RemoveAll(dir) // Alright. Now we have everything set up. Let's create the query. @@ -300,13 +300,13 @@ func TestToJson(t *testing.T) { if err != nil { t.Error(err) } - sg, err := ToSubGraph(gq, ps) + sg, err := ToSubGraph(gq) if err != nil { t.Error(err) } ch := make(chan error) - go ProcessGraph(sg, ch, ps) + go ProcessGraph(sg, ch) err = <-ch if err != nil { t.Error(err) diff --git a/server/main.go b/server/main.go index c1c122ef0df96877a027fb2d984d2418db028bb7..72feeaab8322562e1a71f9c6d0763d3900c98c3e 100644 --- a/server/main.go +++ b/server/main.go @@ -58,69 +58,67 @@ func addCorsHeaders(w http.ResponseWriter) { w.Header().Set("Connection", "close") } -func queryHandler(ps *store.Store) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - addCorsHeaders(w) - if r.Method == "OPTIONS" { - return - } - if r.Method != "POST" { - x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method") - return - } - - var l query.Latency - l.Start = time.Now() - defer r.Body.Close() - q, err := ioutil.ReadAll(r.Body) - if err != nil || len(q) == 0 { - x.Err(glog, err).Error("While reading query") - x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.") - return - } - - glog.WithField("q", string(q)).Debug("Query received.") - gq, _, err := gql.Parse(string(q)) - if err != nil { - x.Err(glog, err).Error("While parsing query") - x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) - return - } - sg, err := query.ToSubGraph(gq, ps) - if err != nil { - x.Err(glog, err).Error("While conversion to internal format") - x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) - return - } - l.Parsing = time.Since(l.Start) - glog.WithField("q", string(q)).Debug("Query parsed.") - - rch := make(chan error) - go query.ProcessGraph(sg, rch, ps) - err = <-rch - if err != nil { - x.Err(glog, err).Error("While executing query") - x.SetStatus(w, x.E_ERROR, err.Error()) - return - } - l.Processing = time.Since(l.Start) - l.Parsing - glog.WithField("q", string(q)).Debug("Graph processed.") - js, err := sg.ToJson(&l) - if err != nil { - x.Err(glog, err).Error("While converting to Json.") - x.SetStatus(w, x.E_ERROR, err.Error()) - return - } - glog.WithFields(logrus.Fields{ - "total": time.Since(l.Start), - "parsing": l.Parsing, - "process": l.Processing, - "json": l.Json, - }).Info("Query Latencies") - - w.Header().Set("Content-Type", "application/json") - fmt.Fprint(w, string(js)) +func queryHandler(w http.ResponseWriter, r *http.Request) { + addCorsHeaders(w) + if r.Method == "OPTIONS" { + return } + if r.Method != "POST" { + x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method") + return + } + + var l query.Latency + l.Start = time.Now() + defer r.Body.Close() + q, err := ioutil.ReadAll(r.Body) + if err != nil || len(q) == 0 { + x.Err(glog, err).Error("While reading query") + x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.") + return + } + + glog.WithField("q", string(q)).Debug("Query received.") + gq, _, err := gql.Parse(string(q)) + if err != nil { + x.Err(glog, err).Error("While parsing query") + x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) + return + } + sg, err := query.ToSubGraph(gq) + if err != nil { + x.Err(glog, err).Error("While conversion to internal format") + x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) + return + } + l.Parsing = time.Since(l.Start) + glog.WithField("q", string(q)).Debug("Query parsed.") + + rch := make(chan error) + go query.ProcessGraph(sg, rch) + err = <-rch + if err != nil { + x.Err(glog, err).Error("While executing query") + x.SetStatus(w, x.E_ERROR, err.Error()) + return + } + l.Processing = time.Since(l.Start) - l.Parsing + glog.WithField("q", string(q)).Debug("Graph processed.") + js, err := sg.ToJson(&l) + if err != nil { + x.Err(glog, err).Error("While converting to Json.") + x.SetStatus(w, x.E_ERROR, err.Error()) + return + } + glog.WithFields(logrus.Fields{ + "total": time.Since(l.Start), + "parsing": l.Parsing, + "process": l.Processing, + "json": l.Json, + }).Info("Query Latencies") + + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, string(js)) } func main() { @@ -150,13 +148,13 @@ func main() { xiduidStore.Init(*xiduidDir) defer xiduidStore.Close() worker.Init(ps, xiduidStore) //Only server instance 0 will have xiduidStore + uid.Init(xiduidStore) } else { worker.Init(ps, nil) } worker.Connect() - uid.Init(ps) - http.HandleFunc("/query", queryHandler(ps)) + http.HandleFunc("/query", queryHandler) glog.WithField("port", *port).Info("Listening for requests...") if err := http.ListenAndServe(":"+*port, nil); err != nil { x.Err(glog, err).Fatal("ListenAndServe") diff --git a/server/main_test.go b/server/main_test.go index f2180849d5b0bc5243eec7c8235a984bb6e7e8e2..5e6a9f81751a4905211a169361486532516ab2ea 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -85,7 +85,7 @@ func closeAll(dir1, dir2 string, clog *commit.Logger) { } func TestQuery(t *testing.T) { - dir1, dir2, ps, clog, err := prepare() + dir1, dir2, _, clog, err := prepare() if err != nil { t.Error(err) return @@ -98,7 +98,7 @@ func TestQuery(t *testing.T) { t.Error(err) return } - g, err := query.ToSubGraph(gq, ps) + g, err := query.ToSubGraph(gq) if err != nil { t.Error(err) return @@ -142,7 +142,7 @@ func TestQuery(t *testing.T) { } ch := make(chan error) - go query.ProcessGraph(g, ch, ps) + go query.ProcessGraph(g, ch) if err := <-ch; err != nil { t.Error(err) return @@ -180,7 +180,7 @@ var q1 = ` ` func BenchmarkQuery(b *testing.B) { - dir1, dir2, ps, clog, err := prepare() + dir1, dir2, _, clog, err := prepare() if err != nil { b.Error(err) return @@ -194,14 +194,14 @@ func BenchmarkQuery(b *testing.B) { b.Error(err) return } - g, err := query.ToSubGraph(gq, ps) + g, err := query.ToSubGraph(gq) if err != nil { b.Error(err) return } ch := make(chan error) - go query.ProcessGraph(g, ch, ps) + go query.ProcessGraph(g, ch) if err := <-ch; err != nil { b.Error(err) return