Skip to content
Snippets Groups Projects
Commit 799450d6 authored by Manish R Jain's avatar Manish R Jain
Browse files

Merge with master

parents 160e1bfd 0deabbb5
No related branches found
No related tags found
No related merge requests found
......@@ -26,7 +26,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/worker"
......@@ -263,8 +262,8 @@ func treeCopy(gq *gql.GraphQuery, sg *SubGraph) {
}
}
func ToSubGraph(gq *gql.GraphQuery, pstore *store.Store) (*SubGraph, error) {
sg, err := newGraph(gq.UID, gq.XID, pstore)
func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) {
sg, err := newGraph(gq.UID, gq.XID)
if err != nil {
return nil, err
}
......@@ -272,11 +271,12 @@ func ToSubGraph(gq *gql.GraphQuery, pstore *store.Store) (*SubGraph, error) {
return sg, nil
}
func newGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) {
func newGraph(euid uint64, exid string) (*SubGraph, error) {
// This would set the Result field in SubGraph,
// and populate the children for attributes.
if len(exid) > 0 {
u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default
// instanceIdx = 0, numInstances = 1 by default
u, err := uid.GetOrAssign(exid, 0, 1)
if err != nil {
x.Err(glog, err).WithField("xid", exid).Error(
"While GetOrAssign uid from external id")
......@@ -402,7 +402,7 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) {
return sorted, nil
}
func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) {
func ProcessGraph(sg *SubGraph, rch chan error) {
var err error
if len(sg.query) > 0 && sg.Attr != "_root_" {
// This task execution would go over the wire in later versions.
......@@ -445,7 +445,7 @@ func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) {
for i := 0; i < len(sg.Children); i++ {
child := sg.Children[i]
child.query = createTaskQuery(child.Attr, sorted)
go ProcessGraph(child, childchan, pstore)
go ProcessGraph(child, childchan)
}
// Now get all the results back.
......
......@@ -98,12 +98,12 @@ func TestNewGraph(t *testing.T) {
ps := new(store.Store)
ps.Init(dir)
sg, err := newGraph(ex, "", ps)
sg, err := newGraph(ex, "")
if err != nil {
t.Error(err)
}
worker.Init(ps)
worker.Init(ps, nil, nil)
uo := flatbuffers.GetUOffsetT(sg.result)
r := new(task.Result)
......@@ -134,7 +134,7 @@ func populateGraph(t *testing.T) (string, *store.Store) {
ps := new(store.Store)
ps.Init(dir)
worker.Init(ps)
worker.Init(ps, nil, nil)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
......@@ -188,7 +188,7 @@ func populateGraph(t *testing.T) (string, *store.Store) {
}
func TestProcessGraph(t *testing.T) {
dir, ps := populateGraph(t)
dir, _ := populateGraph(t)
defer os.RemoveAll(dir)
// Alright. Now we have everything set up. Let's create the query.
......@@ -208,13 +208,13 @@ func TestProcessGraph(t *testing.T) {
if err != nil {
t.Error(err)
}
sg, err := ToSubGraph(gq, ps)
sg, err := ToSubGraph(gq)
if err != nil {
t.Error(err)
}
ch := make(chan error)
go ProcessGraph(sg, ch, ps)
go ProcessGraph(sg, ch)
err = <-ch
if err != nil {
t.Error(err)
......@@ -279,7 +279,7 @@ func TestProcessGraph(t *testing.T) {
}
func TestToJson(t *testing.T) {
dir, ps := populateGraph(t)
dir, _ := populateGraph(t)
defer os.RemoveAll(dir)
// Alright. Now we have everything set up. Let's create the query.
......@@ -300,13 +300,13 @@ func TestToJson(t *testing.T) {
if err != nil {
t.Error(err)
}
sg, err := ToSubGraph(gq, ps)
sg, err := ToSubGraph(gq)
if err != nil {
t.Error(err)
}
ch := make(chan error)
go ProcessGraph(sg, ch, ps)
go ProcessGraph(sg, ch)
err = <-ch
if err != nil {
t.Error(err)
......
......@@ -41,10 +41,15 @@ import (
var glog = x.Log("server")
var postingDir = flag.String("postings", "", "Directory to store posting lists")
var uidDir = flag.String("uids", "", "XID UID posting lists directory")
var mutationDir = flag.String("mutations", "", "Directory to store mutations")
var port = flag.String("port", "8080", "Port to run server on.")
var numcpu = flag.Int("numCpu", runtime.NumCPU(),
"Number of cores to be used by the process")
var instanceIdx = flag.Uint64("instanceIdx", 0,
"serves only entities whose Fingerprint % numInstance == instanceIdx.")
var workers = flag.String("workers", "",
"Comma separated list of IP addresses of workers")
func addCorsHeaders(w http.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Origin", "*")
......@@ -74,71 +79,69 @@ func mutationHandler(mu *gql.Mutation) error {
return nil
}
func queryHandler(ps *store.Store) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
addCorsHeaders(w)
if r.Method == "OPTIONS" {
return
}
if r.Method != "POST" {
x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
return
}
var l query.Latency
l.Start = time.Now()
defer r.Body.Close()
q, err := ioutil.ReadAll(r.Body)
if err != nil || len(q) == 0 {
x.Err(glog, err).Error("While reading query")
x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
return
}
func queryHandler(w http.ResponseWriter, r *http.Request) {
addCorsHeaders(w)
if r.Method == "OPTIONS" {
return
}
if r.Method != "POST" {
x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
return
}
glog.WithField("q", string(q)).Debug("Query received.")
gq, mu, err := gql.Parse(string(q))
if err != nil {
x.Err(glog, err).Error("While parsing query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
mutationHandler(mu)
var l query.Latency
l.Start = time.Now()
defer r.Body.Close()
q, err := ioutil.ReadAll(r.Body)
if err != nil || len(q) == 0 {
x.Err(glog, err).Error("While reading query")
x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
return
}
sg, err := query.ToSubGraph(gq, ps)
if err != nil {
x.Err(glog, err).Error("While conversion to internal format")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
l.Parsing = time.Since(l.Start)
glog.WithField("q", string(q)).Debug("Query parsed.")
glog.WithField("q", string(q)).Debug("Query received.")
gq, mu, err := gql.Parse(string(q))
if err != nil {
x.Err(glog, err).Error("While parsing query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
mutationHandler(mu)
rch := make(chan error)
go query.ProcessGraph(sg, rch, ps)
err = <-rch
if err != nil {
x.Err(glog, err).Error("While executing query")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
l.Processing = time.Since(l.Start) - l.Parsing
glog.WithField("q", string(q)).Debug("Graph processed.")
js, err := sg.ToJson(&l)
if err != nil {
x.Err(glog, err).Error("While converting to Json.")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
glog.WithFields(logrus.Fields{
"total": time.Since(l.Start),
"parsing": l.Parsing,
"process": l.Processing,
"json": l.Json,
}).Info("Query Latencies")
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, string(js))
sg, err := query.ToSubGraph(gq)
if err != nil {
x.Err(glog, err).Error("While conversion to internal format")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
l.Parsing = time.Since(l.Start)
glog.WithField("q", string(q)).Debug("Query parsed.")
rch := make(chan error)
go query.ProcessGraph(sg, rch)
err = <-rch
if err != nil {
x.Err(glog, err).Error("While executing query")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
l.Processing = time.Since(l.Start) - l.Parsing
glog.WithField("q", string(q)).Debug("Graph processed.")
js, err := sg.ToJson(&l)
if err != nil {
x.Err(glog, err).Error("While converting to Json.")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
glog.WithFields(logrus.Fields{
"total": time.Since(l.Start),
"parsing": l.Parsing,
"process": l.Processing,
"json": l.Json,
}).Info("Query Latencies")
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, string(js))
}
func main() {
......@@ -162,12 +165,24 @@ func main() {
clog.Init()
defer clog.Close()
addrs := strings.Split(*workers, ",")
posting.Init(clog)
worker.Init(ps)
if *instanceIdx != 0 {
worker.Init(ps, nil, addrs)
uid.Init(nil)
} else {
uidStore := new(store.Store)
uidStore.Init(*uidDir)
defer uidStore.Close()
// Only server instance 0 will have uidStore
worker.Init(ps, uidStore, addrs)
uid.Init(uidStore)
}
worker.Connect()
uid.Init(ps)
http.HandleFunc("/query", queryHandler(ps))
http.HandleFunc("/query", queryHandler)
glog.WithField("port", *port).Info("Listening for requests...")
if err := http.ListenAndServe(":"+*port, nil); err != nil {
x.Err(glog, err).Fatal("ListenAndServe")
......
......@@ -62,7 +62,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er
clog.Init()
posting.Init(clog)
worker.Init(ps)
worker.Init(ps, nil, nil)
uid.Init(ps)
loader.Init(ps, ps)
......@@ -85,7 +85,7 @@ func closeAll(dir1, dir2 string, clog *commit.Logger) {
}
func TestQuery(t *testing.T) {
dir1, dir2, ps, clog, err := prepare()
dir1, dir2, _, clog, err := prepare()
if err != nil {
t.Error(err)
return
......@@ -98,7 +98,7 @@ func TestQuery(t *testing.T) {
t.Error(err)
return
}
g, err := query.ToSubGraph(gq, ps)
g, err := query.ToSubGraph(gq)
if err != nil {
t.Error(err)
return
......@@ -142,7 +142,7 @@ func TestQuery(t *testing.T) {
}
ch := make(chan error)
go query.ProcessGraph(g, ch, ps)
go query.ProcessGraph(g, ch)
if err := <-ch; err != nil {
t.Error(err)
return
......@@ -180,7 +180,7 @@ var q1 = `
`
func BenchmarkQuery(b *testing.B) {
dir1, dir2, ps, clog, err := prepare()
dir1, dir2, _, clog, err := prepare()
if err != nil {
b.Error(err)
return
......@@ -194,14 +194,14 @@ func BenchmarkQuery(b *testing.B) {
b.Error(err)
return
}
g, err := query.ToSubGraph(gq, ps)
g, err := query.ToSubGraph(gq)
if err != nil {
b.Error(err)
return
}
ch := make(chan error)
go query.ProcessGraph(g, ch, ps)
go query.ProcessGraph(g, ch)
if err := <-ch; err != nil {
b.Error(err)
return
......
......@@ -5,7 +5,6 @@ import (
"io"
"net"
"net/rpc"
"strings"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
......@@ -15,17 +14,18 @@ import (
"github.com/google/flatbuffers/go"
)
var workers = flag.String("workers", "",
"Comma separated list of IP addresses of workers")
var workerPort = flag.String("workerport", ":12345",
"Port used by worker for internal communication.")
var glog = x.Log("worker")
var dataStore *store.Store
var dataStore, xiduidStore *store.Store
var pools []*conn.Pool
var addrs []string
func Init(ps *store.Store) {
func Init(ps, xuStore *store.Store, workerList []string) {
dataStore = ps
xiduidStore = xuStore
addrs = workerList
}
func Connect() {
......@@ -37,7 +37,6 @@ func Connect() {
glog.Fatal(err)
}
addrs := strings.Split(*workers, ",")
for _, addr := range addrs {
if len(addr) == 0 {
continue
......
......@@ -58,7 +58,7 @@ func TestProcessTask(t *testing.T) {
defer clog.Close()
posting.Init(clog)
Init(ps)
Init(ps, nil, nil)
edge := x.DirectedEdge{
ValueId: 23,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment