diff --git a/query/query.go b/query/query.go index 41d18524ddfa98cc25970cd7c79f856ef3de077b..c5ea0bd21dc9e7757b5023425fd9d4332d8fd0f2 100644 --- a/query/query.go +++ b/query/query.go @@ -26,7 +26,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/posting" - "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/worker" @@ -263,8 +262,8 @@ func treeCopy(gq *gql.GraphQuery, sg *SubGraph) { } } -func ToSubGraph(gq *gql.GraphQuery, pstore *store.Store) (*SubGraph, error) { - sg, err := newGraph(gq.UID, gq.XID, pstore) +func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) { + sg, err := newGraph(gq.UID, gq.XID) if err != nil { return nil, err } @@ -272,11 +271,12 @@ func ToSubGraph(gq *gql.GraphQuery, pstore *store.Store) (*SubGraph, error) { return sg, nil } -func newGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) { +func newGraph(euid uint64, exid string) (*SubGraph, error) { // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { - u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default + // instanceIdx = 0, numInstances = 1 by default + u, err := uid.GetOrAssign(exid, 0, 1) if err != nil { x.Err(glog, err).WithField("xid", exid).Error( "While GetOrAssign uid from external id") @@ -402,7 +402,7 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { return sorted, nil } -func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { +func ProcessGraph(sg *SubGraph, rch chan error) { var err error if len(sg.query) > 0 && sg.Attr != "_root_" { // This task execution would go over the wire in later versions. @@ -445,7 +445,7 @@ func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { for i := 0; i < len(sg.Children); i++ { child := sg.Children[i] child.query = createTaskQuery(child.Attr, sorted) - go ProcessGraph(child, childchan, pstore) + go ProcessGraph(child, childchan) } // Now get all the results back. diff --git a/query/query_test.go b/query/query_test.go index 007047a439c3138e2ea152de6cde211cb1cde4cd..397bcaa04f13ec318ff8c2a99c9bbbd6979ace71 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -98,12 +98,12 @@ func TestNewGraph(t *testing.T) { ps := new(store.Store) ps.Init(dir) - sg, err := newGraph(ex, "", ps) + sg, err := newGraph(ex, "") if err != nil { t.Error(err) } - worker.Init(ps) + worker.Init(ps, nil, nil) uo := flatbuffers.GetUOffsetT(sg.result) r := new(task.Result) @@ -134,7 +134,7 @@ func populateGraph(t *testing.T) (string, *store.Store) { ps := new(store.Store) ps.Init(dir) - worker.Init(ps) + worker.Init(ps, nil, nil) clog := commit.NewLogger(dir, "mutations", 50<<20) clog.Init() @@ -188,7 +188,7 @@ func populateGraph(t *testing.T) (string, *store.Store) { } func TestProcessGraph(t *testing.T) { - dir, ps := populateGraph(t) + dir, _ := populateGraph(t) defer os.RemoveAll(dir) // Alright. Now we have everything set up. Let's create the query. @@ -208,13 +208,13 @@ func TestProcessGraph(t *testing.T) { if err != nil { t.Error(err) } - sg, err := ToSubGraph(gq, ps) + sg, err := ToSubGraph(gq) if err != nil { t.Error(err) } ch := make(chan error) - go ProcessGraph(sg, ch, ps) + go ProcessGraph(sg, ch) err = <-ch if err != nil { t.Error(err) @@ -279,7 +279,7 @@ func TestProcessGraph(t *testing.T) { } func TestToJson(t *testing.T) { - dir, ps := populateGraph(t) + dir, _ := populateGraph(t) defer os.RemoveAll(dir) // Alright. Now we have everything set up. Let's create the query. @@ -300,13 +300,13 @@ func TestToJson(t *testing.T) { if err != nil { t.Error(err) } - sg, err := ToSubGraph(gq, ps) + sg, err := ToSubGraph(gq) if err != nil { t.Error(err) } ch := make(chan error) - go ProcessGraph(sg, ch, ps) + go ProcessGraph(sg, ch) err = <-ch if err != nil { t.Error(err) diff --git a/server/main.go b/server/main.go index b3eeec0c4aeb08f60ab089d9e0cf99cd701c73c1..548b530cba3db8ee11af06286607aa833b69cbd9 100644 --- a/server/main.go +++ b/server/main.go @@ -41,10 +41,15 @@ import ( var glog = x.Log("server") var postingDir = flag.String("postings", "", "Directory to store posting lists") +var uidDir = flag.String("uids", "", "XID UID posting lists directory") var mutationDir = flag.String("mutations", "", "Directory to store mutations") var port = flag.String("port", "8080", "Port to run server on.") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") +var instanceIdx = flag.Uint64("instanceIdx", 0, + "serves only entities whose Fingerprint % numInstance == instanceIdx.") +var workers = flag.String("workers", "", + "Comma separated list of IP addresses of workers") func addCorsHeaders(w http.ResponseWriter) { w.Header().Set("Access-Control-Allow-Origin", "*") @@ -74,71 +79,69 @@ func mutationHandler(mu *gql.Mutation) error { return nil } -func queryHandler(ps *store.Store) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - addCorsHeaders(w) - if r.Method == "OPTIONS" { - return - } - if r.Method != "POST" { - x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method") - return - } - - var l query.Latency - l.Start = time.Now() - defer r.Body.Close() - q, err := ioutil.ReadAll(r.Body) - if err != nil || len(q) == 0 { - x.Err(glog, err).Error("While reading query") - x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.") - return - } +func queryHandler(w http.ResponseWriter, r *http.Request) { + addCorsHeaders(w) + if r.Method == "OPTIONS" { + return + } + if r.Method != "POST" { + x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method") + return + } - glog.WithField("q", string(q)).Debug("Query received.") - gq, mu, err := gql.Parse(string(q)) - if err != nil { - x.Err(glog, err).Error("While parsing query") - x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) - return - } - mutationHandler(mu) + var l query.Latency + l.Start = time.Now() + defer r.Body.Close() + q, err := ioutil.ReadAll(r.Body) + if err != nil || len(q) == 0 { + x.Err(glog, err).Error("While reading query") + x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.") + return + } - sg, err := query.ToSubGraph(gq, ps) - if err != nil { - x.Err(glog, err).Error("While conversion to internal format") - x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) - return - } - l.Parsing = time.Since(l.Start) - glog.WithField("q", string(q)).Debug("Query parsed.") + glog.WithField("q", string(q)).Debug("Query received.") + gq, mu, err := gql.Parse(string(q)) + if err != nil { + x.Err(glog, err).Error("While parsing query") + x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) + return + } + mutationHandler(mu) - rch := make(chan error) - go query.ProcessGraph(sg, rch, ps) - err = <-rch - if err != nil { - x.Err(glog, err).Error("While executing query") - x.SetStatus(w, x.E_ERROR, err.Error()) - return - } - l.Processing = time.Since(l.Start) - l.Parsing - glog.WithField("q", string(q)).Debug("Graph processed.") - js, err := sg.ToJson(&l) - if err != nil { - x.Err(glog, err).Error("While converting to Json.") - x.SetStatus(w, x.E_ERROR, err.Error()) - return - } - glog.WithFields(logrus.Fields{ - "total": time.Since(l.Start), - "parsing": l.Parsing, - "process": l.Processing, - "json": l.Json, - }).Info("Query Latencies") - - w.Header().Set("Content-Type", "application/json") - fmt.Fprint(w, string(js)) + sg, err := query.ToSubGraph(gq) + if err != nil { + x.Err(glog, err).Error("While conversion to internal format") + x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) + return } + l.Parsing = time.Since(l.Start) + glog.WithField("q", string(q)).Debug("Query parsed.") + + rch := make(chan error) + go query.ProcessGraph(sg, rch) + err = <-rch + if err != nil { + x.Err(glog, err).Error("While executing query") + x.SetStatus(w, x.E_ERROR, err.Error()) + return + } + l.Processing = time.Since(l.Start) - l.Parsing + glog.WithField("q", string(q)).Debug("Graph processed.") + js, err := sg.ToJson(&l) + if err != nil { + x.Err(glog, err).Error("While converting to Json.") + x.SetStatus(w, x.E_ERROR, err.Error()) + return + } + glog.WithFields(logrus.Fields{ + "total": time.Since(l.Start), + "parsing": l.Parsing, + "process": l.Processing, + "json": l.Json, + }).Info("Query Latencies") + + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, string(js)) } func main() { @@ -162,12 +165,24 @@ func main() { clog.Init() defer clog.Close() + addrs := strings.Split(*workers, ",") + posting.Init(clog) - worker.Init(ps) + if *instanceIdx != 0 { + worker.Init(ps, nil, addrs) + uid.Init(nil) + } else { + uidStore := new(store.Store) + uidStore.Init(*uidDir) + defer uidStore.Close() + // Only server instance 0 will have uidStore + worker.Init(ps, uidStore, addrs) + uid.Init(uidStore) + } + worker.Connect() - uid.Init(ps) - http.HandleFunc("/query", queryHandler(ps)) + http.HandleFunc("/query", queryHandler) glog.WithField("port", *port).Info("Listening for requests...") if err := http.ListenAndServe(":"+*port, nil); err != nil { x.Err(glog, err).Fatal("ListenAndServe") diff --git a/server/main_test.go b/server/main_test.go index 2d13e5515fb16f5a02b3c3677e83d16d7f104241..78c20da95f409349df5aed4163286ba06bb8b159 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -62,7 +62,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er clog.Init() posting.Init(clog) - worker.Init(ps) + worker.Init(ps, nil, nil) uid.Init(ps) loader.Init(ps, ps) @@ -85,7 +85,7 @@ func closeAll(dir1, dir2 string, clog *commit.Logger) { } func TestQuery(t *testing.T) { - dir1, dir2, ps, clog, err := prepare() + dir1, dir2, _, clog, err := prepare() if err != nil { t.Error(err) return @@ -98,7 +98,7 @@ func TestQuery(t *testing.T) { t.Error(err) return } - g, err := query.ToSubGraph(gq, ps) + g, err := query.ToSubGraph(gq) if err != nil { t.Error(err) return @@ -142,7 +142,7 @@ func TestQuery(t *testing.T) { } ch := make(chan error) - go query.ProcessGraph(g, ch, ps) + go query.ProcessGraph(g, ch) if err := <-ch; err != nil { t.Error(err) return @@ -180,7 +180,7 @@ var q1 = ` ` func BenchmarkQuery(b *testing.B) { - dir1, dir2, ps, clog, err := prepare() + dir1, dir2, _, clog, err := prepare() if err != nil { b.Error(err) return @@ -194,14 +194,14 @@ func BenchmarkQuery(b *testing.B) { b.Error(err) return } - g, err := query.ToSubGraph(gq, ps) + g, err := query.ToSubGraph(gq) if err != nil { b.Error(err) return } ch := make(chan error) - go query.ProcessGraph(g, ch, ps) + go query.ProcessGraph(g, ch) if err := <-ch; err != nil { b.Error(err) return diff --git a/worker/worker.go b/worker/worker.go index 9b6e2b91fed605a92d587e137a94c1982cbd449f..20ee65a8d3401d56a1196c43e6c2af77547ea4ab 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,7 +5,6 @@ import ( "io" "net" "net/rpc" - "strings" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/posting" @@ -15,17 +14,18 @@ import ( "github.com/google/flatbuffers/go" ) -var workers = flag.String("workers", "", - "Comma separated list of IP addresses of workers") var workerPort = flag.String("workerport", ":12345", "Port used by worker for internal communication.") var glog = x.Log("worker") -var dataStore *store.Store +var dataStore, xiduidStore *store.Store var pools []*conn.Pool +var addrs []string -func Init(ps *store.Store) { +func Init(ps, xuStore *store.Store, workerList []string) { dataStore = ps + xiduidStore = xuStore + addrs = workerList } func Connect() { @@ -37,7 +37,6 @@ func Connect() { glog.Fatal(err) } - addrs := strings.Split(*workers, ",") for _, addr := range addrs { if len(addr) == 0 { continue diff --git a/worker/worker_test.go b/worker/worker_test.go index 06f4be87f02727da500464a9b077a47e34b3c8bf..35ed56842824af45d10239d741e4f8e9335e495d 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -58,7 +58,7 @@ func TestProcessTask(t *testing.T) { defer clog.Close() posting.Init(clog) - Init(ps) + Init(ps, nil, nil) edge := x.DirectedEdge{ ValueId: 23,