Skip to content
Snippets Groups Projects
Commit eb4f0a0c authored by Manish R Jain's avatar Manish R Jain
Browse files

Handle reading rdf data from files. Use underscores for uid and xid, keeping...

Handle reading rdf data from files. Use underscores for uid and xid, keeping it in sync with internal usage. Output uids as hex.
parent 1addcefa
No related branches found
No related tags found
No related merge requests found
......@@ -97,15 +97,15 @@ func getRoot(l *lex.Lexer) (sg *query.SubGraph, rerr error) {
return nil, fmt.Errorf("Expecting argument va Got: %v", item)
}
if key == "uid" {
if key == "_uid_" {
uid, rerr = strconv.ParseUint(val, 0, 64)
if rerr != nil {
return nil, rerr
}
} else if key == "xid" {
} else if key == "_xid_" {
xid = val
} else {
return nil, fmt.Errorf("Expecting uid or xid. Got: %v", item)
return nil, fmt.Errorf("Expecting _uid_ or _xid_. Got: %v", item)
}
}
if item.Typ != itemRightRound {
......
......@@ -33,12 +33,11 @@ func checkAttr(g *query.SubGraph, attr string) error {
func TestParse(t *testing.T) {
query := `
query {
me(uid:0x0a) {
me(_uid_:0x0a) {
friends {
name
}
gender
age
gender,age
hometown
}
}
......@@ -76,11 +75,10 @@ func TestParse(t *testing.T) {
}
}
/*
func TestParse_error1(t *testing.T) {
query := `
mutation {
me(uid:0x0a) {
me(_uid_:0x0a) {
name
}
}
......@@ -112,7 +110,7 @@ func TestParse_error2(t *testing.T) {
func TestParse_pass1(t *testing.T) {
query := `
{
me(uid:0x0a) {
me(_uid_:0x0a) {
name,
friends(xid:what) { # xid would be ignored.
}
......@@ -137,4 +135,3 @@ func TestParse_pass1(t *testing.T) {
t.Errorf("Expected 0. Got: %v", len(sg.Children))
}
}
*/
......@@ -20,7 +20,6 @@ import (
"fmt"
"unicode/utf8"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/x"
)
......@@ -87,11 +86,6 @@ func (l *Lexer) Errorf(format string,
func (l *Lexer) Emit(t ItemType) {
if t != ItemEOF && l.Pos <= l.Start {
// Let ItemEOF go through.
glog.WithFields(logrus.Fields{
"start": l.Start,
"pos": l.Pos,
"typ": t,
}).Debug("Invalid emit")
return
}
l.Items <- item{
......
......@@ -220,10 +220,11 @@ func TestAddMutation_Value(t *testing.T) {
if p.Uid() != math.MaxUint64 {
t.Errorf("All value uids should go to MaxUint64. Got: %v", p.Uid())
}
var out string
if err := ParseValue(&out, p.ValueBytes()); err != nil {
var iout interface{}
if err := ParseValue(&iout, p.ValueBytes()); err != nil {
t.Error(err)
}
out := iout.(string)
if out != "oh hey there" {
t.Errorf("Expected a value. Got: [%q]", out)
}
......@@ -237,9 +238,10 @@ func TestAddMutation_Value(t *testing.T) {
if ok := ol.Get(&tp, 0); !ok {
t.Error("While retrieving posting")
}
if err := ParseValue(&out, tp.ValueBytes()); err != nil {
if err := ParseValue(&iout, tp.ValueBytes()); err != nil {
t.Error(err)
}
out := iout.(string)
if out != "oh hey there" {
t.Errorf("Expected a value. Got: [%q]", out)
}
......@@ -256,11 +258,11 @@ func TestAddMutation_Value(t *testing.T) {
if ok := ol.Get(&p, 0); !ok {
t.Error("While retrieving posting")
}
var iout int
if err := ParseValue(&iout, p.ValueBytes()); err != nil {
t.Error(err)
}
if iout != 119 {
t.Errorf("Expected 119. Got: %v", iout)
intout := iout.(float64)
if intout != 119 {
t.Errorf("Expected 119. Got: %v", intout)
}
}
......@@ -119,10 +119,11 @@ func TestProcessTask(t *testing.T) {
if ok := r.Values(&tval, 2); !ok {
t.Errorf("Unable to retrieve value")
}
var v string
if err := ParseValue(&v, tval.ValBytes()); err != nil {
var iout interface{}
if err := ParseValue(&iout, tval.ValBytes()); err != nil {
t.Error(err)
}
v := iout.(string)
if v != "photon" {
t.Errorf("Expected photon. Got: %q", v)
}
......
......@@ -163,7 +163,7 @@ func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) {
for j := 0; j < ul.UidsLength(); j++ {
uid := ul.Uids(j)
m := make(map[string]interface{})
m["uid"] = uid
m["_uid_"] = fmt.Sprintf("%#x", uid)
if ival, present := cResult[uid]; !present {
l[j] = m
} else {
......@@ -192,15 +192,15 @@ func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) {
if pval, present := result[q.Uids(i)]; present {
glog.WithField("prev", pval).
WithField("uid", q.Uids(i)).
WithField("_uid_", q.Uids(i)).
WithField("new", ival).
Fatal("Previous value detected.")
}
m := make(map[string]interface{})
m["uid"] = q.Uids(i)
m["_uid_"] = fmt.Sprintf("%#x", q.Uids(i))
glog.WithFields(logrus.Fields{
"uid": q.Uids(i),
"val": ival,
"_uid_": q.Uids(i),
"val": ival,
}).Debug("Got value")
m[g.Attr] = ival
result[q.Uids(i)] = m
......@@ -226,83 +226,6 @@ func (g *SubGraph) ToJson() (js []byte, rerr error) {
return json.Marshal(r)
}
/*
func getChildren(r *task.Result, sg *SubGraph) (result interface{}, rerr error) {
var l []interface{}
for i := 0; i < r.UidsLength(); i++ {
m := make(map[string]interface{})
uid := r.Uids(i)
m["uid"] = uid
if len(sg.Children) > 0 {
for _, cg := range sg.Children {
}
// do something.
}
var v task.Value
if ok := r.Values(&v, i); !ok {
return nil, fmt.Errorf("While reading value at index: %v", i)
}
var i interface{}
if err := posting.ParseValue(i, v.ValBytes()); err != nil {
return nil, err
}
if r.UidsLength() == 0 {
}
}
}
*/
/*
func processChild(result *[]map[string]interface{}, g *SubGraph) error {
ro := flatbuffers.GetUOffsetT(g.result)
r := new(task.Result)
r.Init(g.result, ro)
if r.ValuesLength() > 0 {
var v task.Value
for i := 0; i < r.ValuesLength(); i++ {
if ok := r.Values(&v, i); !ok {
glog.WithField("idx", i).Error("While loading value")
return fmt.Errorf("While parsing value at index: %v", i)
}
var i interface{}
if err := posting.ParseValue(i, v.ValBytes()); err != nil {
x.Log(glog, err).Error("While parsing value")
return err
}
result[i][g.Attr] = i
}
}
if r.UidsLength() > 0 {
rlist := make([]map[string]interface{}, r.UidsLength())
for i := 0; i < r.UidsLength(); i++ {
rlist[i]["uid"] = r.Uids(i)
for _, cg := range g.Children {
if err := processChild(&rlist, cg); err != nil {
x.Log(glog, err).Error("While processing child with attr: %v", cg.Attr)
return err
}
}
}
}
}
*/
/*
func (sg SubGraph) ToJson() (result []byte, rerr error) {
ro := flatbuffers.GetUOffsetT(sg.result)
r := new(task.Result)
r.Init(sg.result, ro)
rlist := make([]map[string]interface{}, r.UidsLength())
for i := 0; i < r.UidsLength(); i++ {
rlist[i]["uid"] = r.Uids(i)
}
}
*/
func NewGraph(euid uint64, exid string) (*SubGraph, error) {
// This would set the Result field in SubGraph,
// and populate the children for attributes.
......@@ -313,7 +236,7 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) {
"While GetOrAssign uid from external id")
return nil, err
}
glog.WithField("xid", exid).WithField("uid", u).Debug("GetOrAssign")
glog.WithField("xid", exid).WithField("_uid_", u).Debug("GetOrAssign")
euid = u
}
......
......@@ -19,8 +19,10 @@ package main
import (
"flag"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/gql"
......@@ -35,17 +37,12 @@ var glog = x.Log("rdf")
var postingDir = flag.String("postings", "", "Directory to store posting lists")
var mutationDir = flag.String("mutations", "", "Directory to store mutations")
var rdfData = flag.String("rdfdata", "", "File containing RDF data")
func rdfHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
return
}
defer r.Body.Close()
func handleRdfReader(reader io.Reader) (int, error) {
cnq := make(chan rdf.NQuad, 1000)
done := make(chan error)
go rdf.ParseStream(r.Body, cnq, done)
go rdf.ParseStream(reader, cnq, done)
count := 0
Loop:
......@@ -55,8 +52,7 @@ Loop:
edge, err := nq.ToEdge()
if err != nil {
x.Err(glog, err).WithField("nq", nq).Error("While converting to edge")
x.SetStatus(w, x.E_ERROR, err.Error())
return
return 0, err
}
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.Get(key)
......@@ -65,12 +61,26 @@ Loop:
case err := <-done:
if err != nil {
x.Err(glog, err).Error("While reading request")
x.SetStatus(w, x.E_ERROR, err.Error())
return
return 0, err
}
break Loop
}
}
return count, nil
}
func rdfHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
return
}
defer r.Body.Close()
count, err := handleRdfReader(r.Body)
if err != nil {
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
glog.WithField("count", count).Debug("RDFs parsed")
x.SetStatus(w, x.E_OK, fmt.Sprintf("%d RDFs parsed", count))
}
......@@ -83,9 +93,9 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
q, err := ioutil.ReadAll(r.Body)
if err != nil {
if err != nil || len(q) == 0 {
x.Err(glog, err).Error("While reading query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
return
}
sg, err := gql.Parse(string(q))
......@@ -102,7 +112,14 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
x.SetStatus(w, x.E_OK, "Successfully ran the query")
js, err := sg.ToJson()
if err != nil {
x.Err(glog, err).Error("While converting to Json.")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, string(js))
}
func main() {
......@@ -118,6 +135,20 @@ func main() {
ms.Init(*mutationDir)
posting.Init(ps, ms)
if len(*rdfData) > 0 {
f, err := os.Open(*rdfData)
if err != nil {
glog.Fatal(err)
}
defer f.Close()
count, err := handleRdfReader(f)
if err != nil {
glog.Fatal(err)
}
glog.WithField("count", count).Debug("RDFs parsed")
}
http.HandleFunc("/rdf", rdfHandler)
http.HandleFunc("/query", queryHandler)
if err := http.ListenAndServe(":8080", nil); err != nil {
......
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"io/ioutil"
"os"
"testing"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/store"
)
func NewStore(t *testing.T) string {
path, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
t.Fail()
return ""
}
return path
}
var q0 = `
{
user(_xid_:alice) {
follows {
_xid_
status
}
_xid_
status
}
}
`
func TestQuery(t *testing.T) {
pdir := NewStore(t)
defer os.RemoveAll(pdir)
ps := new(store.Store)
ps.Init(pdir)
mdir := NewStore(t)
defer os.RemoveAll(mdir)
ms := new(store.Store)
ms.Init(mdir)
posting.Init(ps, ms)
f, err := os.Open("testdata.nq")
if err != nil {
t.Error(err)
return
}
defer f.Close()
count, err := handleRdfReader(f)
if err != nil {
t.Error(err)
return
}
t.Logf("Parsed %v RDFs", count)
// Parse GQL into internal query representation.
g, err := gql.Parse(q0)
if err != nil {
t.Error(err)
return
}
// Test internal query representation.
if len(g.Children) != 3 {
t.Errorf("Expected 3 children. Got: %v", len(g.Children))
}
{
child := g.Children[0]
if child.Attr != "follows" {
t.Errorf("Expected follows. Got: %q", child.Attr)
}
if len(child.Children) != 2 {
t.Errorf("Expected 2 child. Got: %v", len(child.Children))
}
gc := child.Children[0]
if gc.Attr != "_xid_" {
t.Errorf("Expected _xid_. Got: %q", gc.Attr)
}
gc = child.Children[1]
if gc.Attr != "status" {
t.Errorf("Expected status. Got: %q", gc.Attr)
}
}
{
child := g.Children[1]
if child.Attr != "_xid_" {
t.Errorf("Expected _xid_. Got: %q", child.Attr)
}
}
{
child := g.Children[2]
if child.Attr != "status" {
t.Errorf("Expected status. Got: %q", child.Attr)
}
}
ch := make(chan error)
go query.ProcessGraph(g, ch)
if err := <-ch; err != nil {
t.Error(err)
return
}
js, err := g.ToJson()
if err != nil {
t.Error(err)
return
}
fmt.Println(string(js))
}
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment