Skip to content
Snippets Groups Projects
main.go 4.24 KiB
Newer Older
  • Learn to ignore specific revisions
  • /*
     * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * 		http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package main
    
    import (
    	"flag"
    	"fmt"
    	"io/ioutil"
    	"net/http"
    
    
    	"github.com/Sirupsen/logrus"
    
    	"github.com/dgraph-io/dgraph/commit"
    
    	"github.com/dgraph-io/dgraph/gql"
    	"github.com/dgraph-io/dgraph/posting"
    	"github.com/dgraph-io/dgraph/query"
    	"github.com/dgraph-io/dgraph/store"
    
    	"github.com/dgraph-io/dgraph/uid"
    
    	"github.com/dgraph-io/dgraph/worker"
    
    	"github.com/dgraph-io/dgraph/x"
    )
    
    
    
    var postingDir = flag.String("postings", "", "Directory to store posting lists")
    var mutationDir = flag.String("mutations", "", "Directory to store mutations")
    
    var port = flag.String("port", "8080", "Port to run server on.")
    
    Manish R Jain's avatar
    Manish R Jain committed
    var numcpu = flag.Int("numCpu", runtime.NumCPU(),
    	"Number of cores to be used by the process")
    
    func addCorsHeaders(w http.ResponseWriter) {
    	w.Header().Set("Access-Control-Allow-Origin", "*")
    	w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS")
    	w.Header().Set("Access-Control-Allow-Headers",
    
    Manish R Jain's avatar
    Manish R Jain committed
    		"Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token,"+
    			"X-Auth-Token, Cache-Control, X-Requested-With")
    
    	w.Header().Set("Access-Control-Allow-Credentials", "true")
    	w.Header().Set("Connection", "close")
    }
    
    
    func queryHandler(ps *store.Store) http.HandlerFunc {
    	return func(w http.ResponseWriter, r *http.Request) {
    		addCorsHeaders(w)
    		if r.Method == "OPTIONS" {
    			return
    		}
    		if r.Method != "POST" {
    			x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
    			return
    		}
    
    		var l query.Latency
    		l.Start = time.Now()
    		defer r.Body.Close()
    		q, err := ioutil.ReadAll(r.Body)
    		if err != nil || len(q) == 0 {
    			x.Err(glog, err).Error("While reading query")
    			x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
    			return
    		}
    
    		glog.WithField("q", string(q)).Debug("Query received.")
    
    		gq, _, err := gql.Parse(string(q))
    
    		if err != nil {
    			x.Err(glog, err).Error("While parsing query")
    			x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
    			return
    		}
    		sg, err := query.ToSubGraph(gq, ps)
    		if err != nil {
    			x.Err(glog, err).Error("While conversion to internal format")
    			x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
    			return
    		}
    		l.Parsing = time.Since(l.Start)
    		glog.WithField("q", string(q)).Debug("Query parsed.")
    
    		rch := make(chan error)
    		go query.ProcessGraph(sg, rch, ps)
    		err = <-rch
    		if err != nil {
    			x.Err(glog, err).Error("While executing query")
    			x.SetStatus(w, x.E_ERROR, err.Error())
    			return
    		}
    		l.Processing = time.Since(l.Start) - l.Parsing
    		glog.WithField("q", string(q)).Debug("Graph processed.")
    		js, err := sg.ToJson(&l)
    		if err != nil {
    			x.Err(glog, err).Error("While converting to Json.")
    			x.SetStatus(w, x.E_ERROR, err.Error())
    			return
    		}
    		glog.WithFields(logrus.Fields{
    			"total":   time.Since(l.Start),
    			"parsing": l.Parsing,
    			"process": l.Processing,
    			"json":    l.Json,
    		}).Info("Query Latencies")
    
    		w.Header().Set("Content-Type", "application/json")
    		fmt.Fprint(w, string(js))
    
    }
    
    func main() {
    	flag.Parse()
    	if !flag.Parsed() {
    		glog.Fatal("Unable to parse flags")
    	}
    
    Manish R Jain's avatar
    Manish R Jain committed
    	numCpus := *numcpu
    
    	prev := runtime.GOMAXPROCS(numCpus)
    	glog.WithField("num_cpu", numCpus).
    		WithField("prev_maxprocs", prev).
    		Info("Set max procs to num cpus")
    
    
    	ps := new(store.Store)
    	ps.Init(*postingDir)
    
    	clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
    
    	clog.Init()
    	defer clog.Close()
    
    	posting.Init(clog)
    
    	worker.Init(ps)
    
    	http.HandleFunc("/query", queryHandler(ps))
    
    	glog.WithField("port", *port).Info("Listening for requests...")
    	if err := http.ListenAndServe(":"+*port, nil); err != nil {
    
    		x.Err(glog, err).Fatal("ListenAndServe")
    	}
    }