Skip to content
Snippets Groups Projects
main.go 9.34 KiB
Newer Older
 * Copyright 2015 DGraph Labs, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * 		http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package main

import (
	"flag"
	"fmt"
	"io/ioutil"
Pawan Rawal's avatar
Pawan Rawal committed
	"net"
	"net/http"
	"golang.org/x/net/context"

	"google.golang.org/grpc"

	"github.com/Sirupsen/logrus"
	"github.com/dgraph-io/dgraph/commit"
	"github.com/dgraph-io/dgraph/gql"
	"github.com/dgraph-io/dgraph/posting"
	"github.com/dgraph-io/dgraph/query"
Pawan Rawal's avatar
Pawan Rawal committed
	"github.com/dgraph-io/dgraph/query/graph"
	"github.com/dgraph-io/dgraph/rdf"
	"github.com/dgraph-io/dgraph/store"
	"github.com/dgraph-io/dgraph/uid"
	"github.com/dgraph-io/dgraph/worker"
	"github.com/dgraph-io/dgraph/x"
)


var postingDir = flag.String("postings", "", "Directory to store posting lists")
var uidDir = flag.String("uids", "", "XID UID posting lists directory")
var mutationDir = flag.String("mutations", "", "Directory to store mutations")
Pawan Rawal's avatar
Pawan Rawal committed
var port = flag.Int("port", 8080, "Port to run server on.")
Manish R Jain's avatar
Manish R Jain committed
var numcpu = flag.Int("numCpu", runtime.NumCPU(),
	"Number of cores to be used by the process")
var instanceIdx = flag.Uint64("instanceIdx", 0,
	"serves only entities whose Fingerprint % numInstance == instanceIdx.")
var workers = flag.String("workers", "",
	"Comma separated list of IP addresses of workers")
var nomutations = flag.Bool("nomutations", false, "Don't allow mutations on this server.")
func addCorsHeaders(w http.ResponseWriter) {
	w.Header().Set("Access-Control-Allow-Origin", "*")
	w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS")
	w.Header().Set("Access-Control-Allow-Headers",
Manish R Jain's avatar
Manish R Jain committed
		"Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token,"+
			"X-Auth-Token, Cache-Control, X-Requested-With")
	w.Header().Set("Access-Control-Allow-Credentials", "true")
	w.Header().Set("Connection", "close")
}

func mutationHandler(mu *gql.Mutation) error {
	if *nomutations {
		return fmt.Errorf("Mutations are forbidden on this server.")
	}

	r := strings.NewReader(mu.Set)
	scanner := bufio.NewScanner(r)
	for scanner.Scan() {
		ln := strings.Trim(scanner.Text(), " \t")
		if len(ln) == 0 {
			continue
		}
		if err != nil {
			glog.WithError(err).Error("While parsing RDF.")
			return err
		}
		nquads = append(nquads, nq)
	}

	xidToUid := make(map[string]uint64)
	for _, nq := range nquads {
		if !strings.HasPrefix(nq.Subject, "_uid_:") {
		if len(nq.ObjectId) > 0 && !strings.HasPrefix(nq.ObjectId, "_uid_:") {
	if len(xidToUid) > 0 {
		if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil {
			glog.WithError(err).Error("GetOrAssignUidsOverNetwork")
			return err
		}
	}

	var edges []x.DirectedEdge
	for _, nq := range nquads {
		edge, err := nq.ToEdgeUsing(xidToUid)
		if err != nil {
			glog.WithField("nquad", nq).WithError(err).
				Error("While converting to edge")
			return err
		}
		edges = append(edges, edge)
	}

	left, err := worker.MutateOverNetwork(edges)
	if err != nil {
		return err
	}
	if len(left) > 0 {
		glog.WithField("left", len(left)).Error("Some edges couldn't be applied")
		for _, e := range left {
			glog.WithField("edge", e).Debug("Unable to apply mutation")
		}
		return fmt.Errorf("Unapplied mutations")
func queryHandler(w http.ResponseWriter, r *http.Request) {
	addCorsHeaders(w)
	if r.Method == "OPTIONS" {
		return
	if r.Method != "POST" {
		x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
		return
	}
	var l query.Latency
	l.Start = time.Now()
	defer r.Body.Close()
	q, err := ioutil.ReadAll(r.Body)
	if err != nil || len(q) == 0 {
		x.Err(glog, err).Error("While reading query")
		x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
		return
	}
	glog.WithField("q", string(q)).Debug("Query received.")
Manish R Jain's avatar
Manish R Jain committed
	gq, mu, err := gql.Parse(string(q))
	if err != nil {
		x.Err(glog, err).Error("While parsing query")
		x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
		return
	}

	// If we have mutations, run them first.
	if mu != nil && len(mu.Set) > 0 {
		if err = mutationHandler(mu); err != nil {
			glog.WithError(err).Error("While handling mutations.")
			x.SetStatus(w, x.E_ERROR, err.Error())
			return
		}
	}

	if gq == nil || (gq.UID == 0 && len(gq.XID) == 0) {
		x.SetStatus(w, x.E_OK, "Done")
		return
	}
	sg, err := query.ToSubGraph(gq)
	if err != nil {
		x.Err(glog, err).Error("While conversion to internal format")
		x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
		return
	l.Parsing = time.Since(l.Start)
	glog.WithField("q", string(q)).Debug("Query parsed.")

	rch := make(chan error)
Manish R Jain's avatar
Manish R Jain committed
	go query.ProcessGraph(sg, rch, time.Minute)
	err = <-rch
	if err != nil {
		x.Err(glog, err).Error("While executing query")
		x.SetStatus(w, x.E_ERROR, err.Error())
		return
	}
	l.Processing = time.Since(l.Start) - l.Parsing
	glog.WithField("q", string(q)).Debug("Graph processed.")
	js, err := sg.ToJson(&l)
	if err != nil {
		x.Err(glog, err).Error("While converting to Json.")
		x.SetStatus(w, x.E_ERROR, err.Error())
		return
	}
	glog.WithFields(logrus.Fields{
		"total":   time.Since(l.Start),
		"parsing": l.Parsing,
		"process": l.Processing,
		"json":    l.Json,
	}).Info("Query Latencies")

	w.Header().Set("Content-Type", "application/json")
	fmt.Fprint(w, string(js))
Pawan Rawal's avatar
Pawan Rawal committed
// server is used to implement graph.DGraphServer
// This method is used to execute the query and return the response to the
// client as a protocol buffer message.
func (s *server) Query(ctx context.Context,
	req *graph.Request) (*graph.Response, error) {
	resp := new(graph.Response)
Pawan Rawal's avatar
Pawan Rawal committed
	if len(req.Query) == 0 {
		glog.Error("While reading query")
Pawan Rawal's avatar
Pawan Rawal committed
		return resp, fmt.Errorf("Empty query")
Pawan Rawal's avatar
Pawan Rawal committed
	var l query.Latency
	l.Start = time.Now()
	// TODO(pawan): Refactor query parsing and graph processing code to a common
Pawan Rawal's avatar
Pawan Rawal committed
	// function used by Query and queryHandler
Pawan Rawal's avatar
Pawan Rawal committed
	glog.WithField("q", req.Query).Debug("Query received.")
	gq, mu, err := gql.Parse(req.Query)
Pawan Rawal's avatar
Pawan Rawal committed
	if err != nil {
		x.Err(glog, err).Error("While parsing query")
Pawan Rawal's avatar
Pawan Rawal committed
		return resp, err
	// If we have mutations, run them first.
	if mu != nil && len(mu.Set) > 0 {
		if err = mutationHandler(mu); err != nil {
			glog.WithError(err).Error("While handling mutations.")
			return resp, err
		}
	}

	if gq == nil || (gq.UID == 0 && len(gq.XID) == 0) {
		return resp, err
	}

Pawan Rawal's avatar
Pawan Rawal committed
	sg, err := query.ToSubGraph(gq)
	if err != nil {
		x.Err(glog, err).Error("While conversion to internal format")
Pawan Rawal's avatar
Pawan Rawal committed
		return resp, err
Pawan Rawal's avatar
Pawan Rawal committed
	l.Parsing = time.Since(l.Start)
Pawan Rawal's avatar
Pawan Rawal committed
	glog.WithField("q", req.Query).Debug("Query parsed.")
Pawan Rawal's avatar
Pawan Rawal committed

	rch := make(chan error)
	go query.ProcessGraph(sg, rch, time.Minute)
Pawan Rawal's avatar
Pawan Rawal committed
	err = <-rch
	if err != nil {
		x.Err(glog, err).Error("While executing query")
Pawan Rawal's avatar
Pawan Rawal committed
		return resp, err
Pawan Rawal's avatar
Pawan Rawal committed
	l.Processing = time.Since(l.Start) - l.Parsing
Pawan Rawal's avatar
Pawan Rawal committed
	glog.WithField("q", req.Query).Debug("Graph processed.")
Pawan Rawal's avatar
Pawan Rawal committed

	node, err := sg.ToProtocolBuffer(&l)
Pawan Rawal's avatar
Pawan Rawal committed
	if err != nil {
Pawan Rawal's avatar
Pawan Rawal committed
		x.Err(glog, err).Error("While converting to protocol buffer.")
Pawan Rawal's avatar
Pawan Rawal committed
		return resp, err
	gl := new(graph.Latency)
	gl.Parsing, gl.Processing, gl.Pb = l.Parsing.String(), l.Processing.String(),
		l.ProtocolBuffer.String()
	resp.L = gl
Pawan Rawal's avatar
Pawan Rawal committed
	return resp, err
// This function register a DGraph grpc server on the address, which is used
// exchanging protocol buffer messages.
Pawan Rawal's avatar
Pawan Rawal committed
	ln, err := net.Listen("tcp", address)
	if err != nil {
		glog.Fatalf("While running server for client: %v", err)
Pawan Rawal's avatar
Pawan Rawal committed
	}
	glog.WithField("address", ln.Addr()).Info("Client Worker listening")

Pawan Rawal's avatar
Pawan Rawal committed
	graph.RegisterDGraphServer(s, &server{})
Pawan Rawal's avatar
Pawan Rawal committed
	if err = s.Serve(ln); err != nil {
		glog.Fatalf("While serving gRpc requests", err)
	}
func main() {
	flag.Parse()
	if !flag.Parsed() {
		glog.Fatal("Unable to parse flags")
	}
Manish R Jain's avatar
Manish R Jain committed
	numCpus := *numcpu
Pawan Rawal's avatar
Pawan Rawal committed
	glog.WithField("num_cpu", numCpus).WithField("prev_maxprocs", prev).
Pawan Rawal's avatar
Pawan Rawal committed
	if *port%2 != 0 {
		glog.Fatalf("Port should be an even number: %v", *port)
	}

	ps := new(store.Store)
	ps.Init(*postingDir)
	clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
	clog.Init()
	defer clog.Close()
	addrs := strings.Split(*workers, ",")
	lenAddr := uint64(len(addrs))
	if lenAddr == 0 {
		// If no worker is specified, then we're it.
		lenAddr = 1
	}
	posting.Init(clog)
	if *instanceIdx != 0 {
		worker.Init(ps, nil, *instanceIdx, lenAddr)
		uid.Init(nil)
	} else {
		uidStore := new(store.Store)
		uidStore.Init(*uidDir)
		defer uidStore.Close()
		// Only server instance 0 will have uidStore
		worker.Init(ps, uidStore, *instanceIdx, lenAddr)
		uid.Init(uidStore)
	worker.Connect(addrs)
Pawan Rawal's avatar
Pawan Rawal committed
	// Grpc server runs on (port + 1)
	go runGrpcServer(fmt.Sprintf(":%d", *port+1))
	http.HandleFunc("/query", queryHandler)
	glog.WithField("port", *port).Info("Listening for requests...")
Pawan Rawal's avatar
Pawan Rawal committed
	if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil); err != nil {
		x.Err(glog, err).Fatal("ListenAndServe")
	}
}