Newer
Older
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
Manish R Jain
committed
"runtime"
Manish R Jain
committed
"time"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/worker"
var glog = x.Log("server")
var postingDir = flag.String("postings", "", "Directory to store posting lists")
var xiduidDir = flag.String("xiduid", "", "XID UID posting lists directory")
var mutationDir = flag.String("mutations", "", "Directory to store mutations")
Manish R Jain
committed
var port = flag.String("port", "8080", "Port to run server on.")
var numcpu = flag.Int("numCpu", runtime.NumCPU(),
"Number of cores to be used by the process")
var instanceIdx = flag.Uint64("instanceIdx", 0,
"serves only entities whose Fingerprint % numInstance == instanceIdx.")
var numInstances = flag.Uint64("numInstances", 1,
"Total number of server instances")
func addCorsHeaders(w http.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers",
"Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token,"+
"X-Auth-Token, Cache-Control, X-Requested-With")
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Connection", "close")
}
func queryHandler(w http.ResponseWriter, r *http.Request) {
addCorsHeaders(w)
if r.Method == "OPTIONS" {
return
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
if r.Method != "POST" {
x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
return
}
var l query.Latency
l.Start = time.Now()
defer r.Body.Close()
q, err := ioutil.ReadAll(r.Body)
if err != nil || len(q) == 0 {
x.Err(glog, err).Error("While reading query")
x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
return
}
glog.WithField("q", string(q)).Debug("Query received.")
gq, _, err := gql.Parse(string(q))
if err != nil {
x.Err(glog, err).Error("While parsing query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
sg, err := query.ToSubGraph(gq)
if err != nil {
x.Err(glog, err).Error("While conversion to internal format")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
l.Parsing = time.Since(l.Start)
glog.WithField("q", string(q)).Debug("Query parsed.")
rch := make(chan error)
go query.ProcessGraph(sg, rch)
err = <-rch
if err != nil {
x.Err(glog, err).Error("While executing query")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
l.Processing = time.Since(l.Start) - l.Parsing
glog.WithField("q", string(q)).Debug("Graph processed.")
js, err := sg.ToJson(&l)
if err != nil {
x.Err(glog, err).Error("While converting to Json.")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
glog.WithFields(logrus.Fields{
"total": time.Since(l.Start),
"parsing": l.Parsing,
"process": l.Processing,
"json": l.Json,
}).Info("Query Latencies")
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, string(js))
}
func main() {
flag.Parse()
if !flag.Parsed() {
glog.Fatal("Unable to parse flags")
}
Manish R Jain
committed
logrus.SetLevel(logrus.InfoLevel)
Manish R Jain
committed
prev := runtime.GOMAXPROCS(numCpus)
glog.WithField("num_cpu", numCpus).
WithField("prev_maxprocs", prev).
Info("Set max procs to num cpus")
ps := new(store.Store)
ps.Init(*postingDir)
defer ps.Close()
clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
clog.SyncEvery = 1
clog.Init()
defer clog.Close()
if *instanceIdx == 0 {
xiduidStore := new(store.Store)
xiduidStore.Init(*xiduidDir)
defer xiduidStore.Close()
worker.Init(ps, xiduidStore) //Only server instance 0 will have xiduidStore
} else {
worker.Init(ps, nil)
}
Manish R Jain
committed
worker.Connect()
Manish R Jain
committed
Manish R Jain
committed
glog.WithField("port", *port).Info("Listening for requests...")
if err := http.ListenAndServe(":"+*port, nil); err != nil {
x.Err(glog, err).Fatal("ListenAndServe")
}
}