Skip to content
Snippets Groups Projects
Commit 9f50b864 authored by Ashwin's avatar Ashwin
Browse files

changes to server to accomodate passing of pstore

parent 4964433f
No related branches found
No related tags found
No related merge requests found
......@@ -262,8 +262,8 @@ func treeCopy(gq *gql.GraphQuery, sg *SubGraph) {
}
}
func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) {
sg, err := newGraph(gq.UID, gq.XID)
func ToSubGraph(gq *gql.GraphQuery, pstore *store.Store) (*SubGraph, error) {
sg, err := newGraph(gq.UID, gq.XID, pstore)
if err != nil {
return nil, err
}
......@@ -271,11 +271,11 @@ func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) {
return sg, nil
}
func newGraph(euid uint64, exid string) (*SubGraph, error) {
func newGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) {
// This would set the Result field in SubGraph,
// and populate the children for attributes.
if len(exid) > 0 {
u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default
u, err := uid.GetOrAssign(exid, 0, 1, pstore) // instanceIdx = 0, numInstances = 1 by default
if err != nil {
x.Err(glog, err).WithField("xid", exid).Error(
"While GetOrAssign uid from external id")
......@@ -401,11 +401,11 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) {
return sorted, nil
}
func ProcessGraph(sg *SubGraph, rch chan error) {
func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) {
var err error
if len(sg.query) > 0 && sg.Attr != "_root_" {
// This task execution would go over the wire in later versions.
sg.result, err = posting.ProcessTask(sg.query)
sg.result, err = posting.ProcessTask(sg.query, pstore)
if err != nil {
x.Err(glog, err).Error("While processing task.")
rch <- err
......@@ -444,7 +444,7 @@ func ProcessGraph(sg *SubGraph, rch chan error) {
for i := 0; i < len(sg.Children); i++ {
child := sg.Children[i]
child.query = createTaskQuery(child.Attr, sorted)
go ProcessGraph(child, childchan)
go ProcessGraph(child, childchan, pstore)
}
// Now get all the results back.
......
......@@ -88,7 +88,16 @@ func checkSingleValue(t *testing.T, child *SubGraph,
func TestNewGraph(t *testing.T) {
var ex uint64
ex = 101
sg, err := newGraph(ex, "")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
return
}
ps := new(store.Store)
ps.Init(dir)
sg, err := newGraph(ex, "", ps)
if err != nil {
t.Error(err)
}
......@@ -111,12 +120,12 @@ func TestNewGraph(t *testing.T) {
}
}
func populateGraph(t *testing.T) string {
func populateGraph(t *testing.T) (string, *store.Store) {
// logrus.SetLevel(logrus.DebugLevel)
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
return ""
return "", nil
}
ps := new(store.Store)
......@@ -124,7 +133,7 @@ func populateGraph(t *testing.T) string {
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
posting.Init(ps, clog)
posting.Init(clog)
// So, user we're interested in has uid: 1.
// She has 4 friends: 23, 24, 25, 31, and 101
......@@ -133,48 +142,48 @@ func populateGraph(t *testing.T) string {
Source: "testing",
Timestamp: time.Now(),
}
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
edge.ValueId = 24
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
edge.ValueId = 25
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
edge.ValueId = 31
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
edge.ValueId = 101
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
// Now let's add a few properties for the main user.
edge.Value = "Michonne"
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name"), ps))
edge.Value = "female"
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender"), ps))
edge.Value = "alive"
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status"), ps))
// Now let's add a name for each of the friends, except 101.
edge.Value = "Rick Grimes"
addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name"), ps))
edge.Value = "Glenn Rhee"
addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name"), ps))
edge.Value = "Daryl Dixon"
addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name"), ps))
edge.Value = "Andrea"
addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name"), ps))
return dir
return dir, ps
}
func TestProcessGraph(t *testing.T) {
dir := populateGraph(t)
dir, ps := populateGraph(t)
defer os.RemoveAll(dir)
// Alright. Now we have everything set up. Let's create the query.
......@@ -194,13 +203,13 @@ func TestProcessGraph(t *testing.T) {
if err != nil {
t.Error(err)
}
sg, err := ToSubGraph(gq)
sg, err := ToSubGraph(gq, ps)
if err != nil {
t.Error(err)
}
ch := make(chan error)
go ProcessGraph(sg, ch)
go ProcessGraph(sg, ch, ps)
err = <-ch
if err != nil {
t.Error(err)
......@@ -265,7 +274,7 @@ func TestProcessGraph(t *testing.T) {
}
func TestToJson(t *testing.T) {
dir := populateGraph(t)
dir, ps := populateGraph(t)
defer os.RemoveAll(dir)
// Alright. Now we have everything set up. Let's create the query.
......@@ -286,13 +295,13 @@ func TestToJson(t *testing.T) {
if err != nil {
t.Error(err)
}
sg, err := ToSubGraph(gq)
sg, err := ToSubGraph(gq, ps)
if err != nil {
t.Error(err)
}
ch := make(chan error)
go ProcessGraph(sg, ch)
go ProcessGraph(sg, ch, ps)
err = <-ch
if err != nil {
t.Error(err)
......
......@@ -51,67 +51,69 @@ func addCorsHeaders(w http.ResponseWriter) {
w.Header().Set("Connection", "close")
}
func queryHandler(w http.ResponseWriter, r *http.Request) {
addCorsHeaders(w)
if r.Method == "OPTIONS" {
return
func queryHandler(ps *store.Store) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
addCorsHeaders(w)
if r.Method == "OPTIONS" {
return
}
if r.Method != "POST" {
x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
return
}
var l query.Latency
l.Start = time.Now()
defer r.Body.Close()
q, err := ioutil.ReadAll(r.Body)
if err != nil || len(q) == 0 {
x.Err(glog, err).Error("While reading query")
x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
return
}
glog.WithField("q", string(q)).Debug("Query received.")
gq, err := gql.Parse(string(q))
if err != nil {
x.Err(glog, err).Error("While parsing query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
sg, err := query.ToSubGraph(gq, ps)
if err != nil {
x.Err(glog, err).Error("While conversion to internal format")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
l.Parsing = time.Since(l.Start)
glog.WithField("q", string(q)).Debug("Query parsed.")
rch := make(chan error)
go query.ProcessGraph(sg, rch, ps)
err = <-rch
if err != nil {
x.Err(glog, err).Error("While executing query")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
l.Processing = time.Since(l.Start) - l.Parsing
glog.WithField("q", string(q)).Debug("Graph processed.")
js, err := sg.ToJson(&l)
if err != nil {
x.Err(glog, err).Error("While converting to Json.")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
glog.WithFields(logrus.Fields{
"total": time.Since(l.Start),
"parsing": l.Parsing,
"process": l.Processing,
"json": l.Json,
}).Info("Query Latencies")
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, string(js))
}
if r.Method != "POST" {
x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
return
}
var l query.Latency
l.Start = time.Now()
defer r.Body.Close()
q, err := ioutil.ReadAll(r.Body)
if err != nil || len(q) == 0 {
x.Err(glog, err).Error("While reading query")
x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
return
}
glog.WithField("q", string(q)).Debug("Query received.")
gq, err := gql.Parse(string(q))
if err != nil {
x.Err(glog, err).Error("While parsing query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
sg, err := query.ToSubGraph(gq)
if err != nil {
x.Err(glog, err).Error("While conversion to internal format")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
l.Parsing = time.Since(l.Start)
glog.WithField("q", string(q)).Debug("Query parsed.")
rch := make(chan error)
go query.ProcessGraph(sg, rch)
err = <-rch
if err != nil {
x.Err(glog, err).Error("While executing query")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
l.Processing = time.Since(l.Start) - l.Parsing
glog.WithField("q", string(q)).Debug("Graph processed.")
js, err := sg.ToJson(&l)
if err != nil {
x.Err(glog, err).Error("While converting to Json.")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
glog.WithFields(logrus.Fields{
"total": time.Since(l.Start),
"parsing": l.Parsing,
"process": l.Processing,
"json": l.Json,
}).Info("Query Latencies")
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, string(js))
}
func main() {
......@@ -135,9 +137,9 @@ func main() {
clog.Init()
defer clog.Close()
posting.Init(ps, clog)
posting.Init(clog)
http.HandleFunc("/query", queryHandler)
http.HandleFunc("/query", queryHandler(ps))
glog.WithField("port", *port).Info("Listening for requests...")
if err := http.ListenAndServe(":"+*port, nil); err != nil {
x.Err(glog, err).Fatal("ListenAndServe")
......
......@@ -43,34 +43,34 @@ var q0 = `
}
`
func prepare() (dir1, dir2 string, clog *commit.Logger, rerr error) {
func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr error) {
var err error
dir1, err = ioutil.TempDir("", "storetest_")
if err != nil {
return "", "", nil, err
return "", "", nil, nil, err
}
ps := new(store.Store)
ps = new(store.Store)
ps.Init(dir1)
dir2, err = ioutil.TempDir("", "storemuts_")
if err != nil {
return dir1, "", nil, err
return dir1, "", nil, nil, err
}
clog = commit.NewLogger(dir2, "mutations", 50<<20)
clog.Init()
posting.Init(ps, clog)
posting.Init(clog)
f, err := os.Open("testdata.nq")
if err != nil {
return dir1, dir2, clog, err
return dir1, dir2, nil, clog, err
}
defer f.Close()
_, err = loader.HandleRdfReader(f, 0, 1)
_, err = loader.HandleRdfReader(f, 0, 1, ps, ps)
if err != nil {
return dir1, dir2, clog, err
return dir1, dir2, nil, clog, err
}
return dir1, dir2, clog, nil
return dir1, dir2, ps, clog, nil
}
func closeAll(dir1, dir2 string, clog *commit.Logger) {
......@@ -80,7 +80,7 @@ func closeAll(dir1, dir2 string, clog *commit.Logger) {
}
func TestQuery(t *testing.T) {
dir1, dir2, clog, err := prepare()
dir1, dir2, ps, clog, err := prepare()
if err != nil {
t.Error(err)
return
......@@ -93,7 +93,7 @@ func TestQuery(t *testing.T) {
t.Error(err)
return
}
g, err := query.ToSubGraph(gq)
g, err := query.ToSubGraph(gq, ps)
if err != nil {
t.Error(err)
return
......@@ -137,7 +137,7 @@ func TestQuery(t *testing.T) {
}
ch := make(chan error)
go query.ProcessGraph(g, ch)
go query.ProcessGraph(g, ch, ps)
if err := <-ch; err != nil {
t.Error(err)
return
......@@ -175,7 +175,7 @@ var q1 = `
`
func BenchmarkQuery(b *testing.B) {
dir1, dir2, clog, err := prepare()
dir1, dir2, ps, clog, err := prepare()
if err != nil {
b.Error(err)
return
......@@ -189,14 +189,14 @@ func BenchmarkQuery(b *testing.B) {
b.Error(err)
return
}
g, err := query.ToSubGraph(gq)
g, err := query.ToSubGraph(gq, ps)
if err != nil {
b.Error(err)
return
}
ch := make(chan error)
go query.ProcessGraph(g, ch)
go query.ProcessGraph(g, ch, ps)
if err := <-ch; err != nil {
b.Error(err)
return
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment