Newer
Older
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
Manish R Jain
committed
"runtime"
Manish R Jain
committed
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/worker"
var glog = x.Log("server")
var postingDir = flag.String("postings", "", "Directory to store posting lists")
var uidDir = flag.String("uids", "", "XID UID posting lists directory")
var mutationDir = flag.String("mutations", "", "Directory to store mutations")
var port = flag.Int("port", 8080, "Port to run server on.")
var numcpu = flag.Int("numCpu", runtime.NumCPU(),
"Number of cores to be used by the process")
var instanceIdx = flag.Uint64("instanceIdx", 0,
"serves only entities whose Fingerprint % numInstance == instanceIdx.")
var workers = flag.String("workers", "",
"Comma separated list of IP addresses of workers")
Manish R Jain
committed
var nomutations = flag.Bool("nomutations", false, "Don't allow mutations on this server.")
func addCorsHeaders(w http.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers",
"Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token,"+
"X-Auth-Token, Cache-Control, X-Requested-With")
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Connection", "close")
}
func mutationHandler(mu *gql.Mutation) error {
Manish R Jain
committed
if *nomutations {
return fmt.Errorf("Mutations are forbidden on this server.")
}
r := strings.NewReader(mu.Set)
scanner := bufio.NewScanner(r)
var nquads []rdf.NQuad
for scanner.Scan() {
ln := strings.Trim(scanner.Text(), " \t")
if len(ln) == 0 {
continue
}
nq, err := rdf.Parse(ln)
if err != nil {
glog.WithError(err).Error("While parsing RDF.")
return err
}
nquads = append(nquads, nq)
}
xidToUid := make(map[string]uint64)
for _, nq := range nquads {
if !strings.HasPrefix(nq.Subject, "_uid_:") {
xidToUid[nq.Subject] = 0
}
if len(nq.ObjectId) > 0 && !strings.HasPrefix(nq.ObjectId, "_uid_:") {
xidToUid[nq.ObjectId] = 0
}
}
if len(xidToUid) > 0 {
if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil {
glog.WithError(err).Error("GetOrAssignUidsOverNetwork")
return err
}
}
var edges []x.DirectedEdge
for _, nq := range nquads {
edge, err := nq.ToEdgeUsing(xidToUid)
if err != nil {
glog.WithField("nquad", nq).WithError(err).
Error("While converting to edge")
return err
}
edges = append(edges, edge)
}
left, err := worker.MutateOverNetwork(edges)
if err != nil {
return err
}
if len(left) > 0 {
glog.WithField("left", len(left)).Error("Some edges couldn't be applied")
for _, e := range left {
glog.WithField("edge", e).Debug("Unable to apply mutation")
}
return fmt.Errorf("Unapplied mutations")
}
return nil
}
func queryHandler(w http.ResponseWriter, r *http.Request) {
addCorsHeaders(w)
if r.Method == "OPTIONS" {
return
if r.Method != "POST" {
x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
return
}
var l query.Latency
l.Start = time.Now()
defer r.Body.Close()
q, err := ioutil.ReadAll(r.Body)
if err != nil || len(q) == 0 {
x.Err(glog, err).Error("While reading query")
x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
return
}
glog.WithField("q", string(q)).Debug("Query received.")
if err != nil {
x.Err(glog, err).Error("While parsing query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
// If we have mutations, run them first.
if mu != nil && len(mu.Set) > 0 {
if err = mutationHandler(mu); err != nil {
glog.WithError(err).Error("While handling mutations.")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
}
if gq == nil || (gq.UID == 0 && len(gq.XID) == 0) {
x.SetStatus(w, x.E_OK, "Done")
return
}
sg, err := query.ToSubGraph(gq)
if err != nil {
x.Err(glog, err).Error("While conversion to internal format")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
l.Parsing = time.Since(l.Start)
glog.WithField("q", string(q)).Debug("Query parsed.")
rch := make(chan error)
err = <-rch
if err != nil {
x.Err(glog, err).Error("While executing query")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
l.Processing = time.Since(l.Start) - l.Parsing
glog.WithField("q", string(q)).Debug("Graph processed.")
js, err := sg.ToJson(&l)
if err != nil {
x.Err(glog, err).Error("While converting to Json.")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
glog.WithFields(logrus.Fields{
"total": time.Since(l.Start),
"parsing": l.Parsing,
"process": l.Processing,
"json": l.Json,
}).Info("Query Latencies")
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, string(js))
type server struct{}
Pawan Rawal
committed
// This method is used to execute the query and return the response to the
// client as a protocol buffer message.
func (s *server) Query(ctx context.Context,
req *graph.Request) (*graph.Response, error) {
resp := new(graph.Response)
glog.Error("While reading query")
// TODO(pawan): Refactor query parsing and graph processing code to a common
glog.WithField("q", req.Query).Debug("Query received.")
gq, _, err := gql.Parse(req.Query)
if err != nil {
x.Err(glog, err).Error("While parsing query")
}
sg, err := query.ToSubGraph(gq)
if err != nil {
x.Err(glog, err).Error("While conversion to internal format")
err = <-rch
if err != nil {
x.Err(glog, err).Error("While executing query")
glog.WithField("q", req.Query).Debug("Graph processed.")
x.Err(glog, err).Error("While converting to protocol buffer.")
gl.Parsing, gl.Processing, gl.Pb = l.Parsing.String(), l.Processing.String(),
l.ProtocolBuffer.String()
resp.L = gl
Pawan Rawal
committed
// This function register a DGraph grpc server on the address, which is used
// exchanging protocol buffer messages.
Manish R Jain
committed
func runGrpcServer(address string) {
ln, err := net.Listen("tcp", address)
if err != nil {
glog.Fatalf("While running server for client: %v", err)
Manish R Jain
committed
return
}
glog.WithField("address", ln.Addr()).Info("Client Worker listening")
s := grpc.NewServer()
if err = s.Serve(ln); err != nil {
glog.Fatalf("While serving gRpc requests", err)
}
Manish R Jain
committed
return
func main() {
flag.Parse()
if !flag.Parsed() {
glog.Fatal("Unable to parse flags")
}
Manish R Jain
committed
logrus.SetLevel(logrus.InfoLevel)
Manish R Jain
committed
prev := runtime.GOMAXPROCS(numCpus)
glog.WithField("num_cpu", numCpus).WithField("prev_maxprocs", prev).
Manish R Jain
committed
Info("Set max procs to num cpus")
if *port%2 != 0 {
glog.Fatalf("Port should be an even number: %v", *port)
}
ps := new(store.Store)
ps.Init(*postingDir)
defer ps.Close()
clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
clog.SyncEvery = 1
clog.Init()
defer clog.Close()
addrs := strings.Split(*workers, ",")
lenAddr := uint64(len(addrs))
if lenAddr == 0 {
// If no worker is specified, then we're it.
lenAddr = 1
}
worker.Init(ps, nil, *instanceIdx, lenAddr)
uid.Init(nil)
} else {
uidStore := new(store.Store)
uidStore.Init(*uidDir)
defer uidStore.Close()
// Only server instance 0 will have uidStore
worker.Init(ps, uidStore, *instanceIdx, lenAddr)
worker.Connect(addrs)
Manish R Jain
committed
go runGrpcServer(fmt.Sprintf(":%d", *port+1))
Manish R Jain
committed
Manish R Jain
committed
glog.WithField("port", *port).Info("Listening for requests...")
if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil); err != nil {
x.Err(glog, err).Fatal("ListenAndServe")
}
}