Skip to content
Snippets Groups Projects
main.go 3.15 KiB
Newer Older
/*
 * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * 		http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package main

import (
	"flag"
	"fmt"
	"io/ioutil"
	"net/http"

	"github.com/Sirupsen/logrus"
	"github.com/dgraph-io/dgraph/gql"
	"github.com/dgraph-io/dgraph/posting"
	"github.com/dgraph-io/dgraph/query"
	"github.com/dgraph-io/dgraph/rdf"
	"github.com/dgraph-io/dgraph/store"
	"github.com/dgraph-io/dgraph/x"
)

var glog = x.Log("rdf")

var postingDir = flag.String("postings", "", "Directory to store posting lists")
var mutationDir = flag.String("mutations", "", "Directory to store mutations")

func rdfHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != "POST" {
		x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
		return
	}

	defer r.Body.Close()
	cnq := make(chan rdf.NQuad, 1000)
	done := make(chan error)
	go rdf.ParseStream(r.Body, cnq, done)

	count := 0
Loop:
	for {
		select {
		case nq := <-cnq:
			edge, err := nq.ToEdge()
			if err != nil {
				x.Err(glog, err).WithField("nq", nq).Error("While converting to edge")
				x.SetStatus(w, x.E_ERROR, err.Error())
				return
			}
			key := posting.Key(edge.Entity, edge.Attribute)
			plist := posting.Get(key)
			plist.AddMutation(edge, posting.Set)
			count += 1
		case err := <-done:
			if err != nil {
				x.Err(glog, err).Error("While reading request")
				x.SetStatus(w, x.E_ERROR, err.Error())
				return
			}
			break Loop
		}
	}
	glog.WithField("count", count).Debug("RDFs parsed")
	x.SetStatus(w, x.E_OK, fmt.Sprintf("%d RDFs parsed", count))
}

func queryHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != "POST" {
		x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
		return
	}

	defer r.Body.Close()
	q, err := ioutil.ReadAll(r.Body)
	if err != nil {
		x.Err(glog, err).Error("While reading query")
		x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
		return
	}
	sg, err := gql.Parse(string(q))
	if err != nil {
		x.Err(glog, err).Error("While parsing query")
		x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
		return
	}
	rch := make(chan error)
	go query.ProcessGraph(sg, rch)
	err = <-rch
	if err != nil {
		x.Err(glog, err).Error("While executing query")
		x.SetStatus(w, x.E_ERROR, err.Error())
		return
	}
	x.SetStatus(w, x.E_OK, "Successfully ran the query")
}

func main() {
	flag.Parse()
	if !flag.Parsed() {
		glog.Fatal("Unable to parse flags")
	}
	logrus.SetLevel(logrus.DebugLevel)

	ps := new(store.Store)
	ps.Init(*postingDir)
	ms := new(store.Store)
	ms.Init(*mutationDir)
	posting.Init(ps, ms)

	http.HandleFunc("/rdf", rdfHandler)
	http.HandleFunc("/query", queryHandler)
	if err := http.ListenAndServe(":8080", nil); err != nil {
		x.Err(glog, err).Fatal("ListenAndServe")
	}
}