Skip to content
Snippets Groups Projects
Commit 918a8d91 authored by Manish R Jain's avatar Manish R Jain
Browse files

Ability to parse RDF stream. Convert NQuad to DirectedEdge

parent f2a1d876
No related branches found
No related tags found
No related merge requests found
...@@ -17,9 +17,15 @@ ...@@ -17,9 +17,15 @@
package rdf package rdf
import ( import (
"bufio"
"fmt" "fmt"
"io"
"strings"
"time"
"github.com/dgraph-io/dgraph/lex" "github.com/dgraph-io/dgraph/lex"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/x"
) )
type NQuad struct { type NQuad struct {
...@@ -31,6 +37,27 @@ type NQuad struct { ...@@ -31,6 +37,27 @@ type NQuad struct {
Language string Language string
} }
func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) {
sid, err := uid.GetOrAssign(nq.Subject)
if err != nil {
return result, err
}
result.Entity = sid
if len(nq.ObjectId) > 0 {
oid, err := uid.GetOrAssign(nq.ObjectId)
if err != nil {
return result, err
}
result.ValueId = oid
} else {
result.Value = nq.ObjectValue
}
result.Attribute = nq.Predicate
result.Source = nq.Label
result.Timestamp = time.Now()
return result, nil
}
func stripBracketsIfPresent(val string) string { func stripBracketsIfPresent(val string) string {
if val[0] != '<' { if val[0] != '<' {
return val return val
...@@ -94,3 +121,33 @@ func Parse(line string) (rnq NQuad, rerr error) { ...@@ -94,3 +121,33 @@ func Parse(line string) (rnq NQuad, rerr error) {
} }
return rnq, nil return rnq, nil
} }
func isNewline(r rune) bool {
return r == '\n' || r == '\r'
}
func ParseStream(reader io.Reader, cnq chan NQuad, done chan error) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
line = strings.Trim(line, " \t")
if len(line) == 0 {
continue
}
glog.Debugf("Got line: %q", line)
nq, err := Parse(line)
if err != nil {
x.Err(glog, err).Errorf("While parsing: %q", line)
done <- err
return
}
cnq <- nq
}
if err := scanner.Err(); err != nil {
x.Err(glog, err).Error("While scanning input")
done <- err
return
}
done <- nil
}
...@@ -18,6 +18,7 @@ package rdf ...@@ -18,6 +18,7 @@ package rdf
import ( import (
"reflect" "reflect"
"strings"
"testing" "testing"
) )
...@@ -197,3 +198,36 @@ func TestLex(t *testing.T) { ...@@ -197,3 +198,36 @@ func TestLex(t *testing.T) {
} }
} }
} }
func TestParseStream(t *testing.T) {
cnq := make(chan NQuad, 10)
done := make(chan error)
data := `
<alice> <follows> <bob> .
<bob> <follows> <fred> .
<bob> <status> "cool_person" .
<charlie> <follows> <bob> .
<charlie> <follows> <dani> .
<dani> <follows> <bob> .
<dani> <follows> <greg> .
<dani> <status> "cool_person" .
<emily> <follows> <fred> .
<fred> <follows> <greg> .
<greg> <status> "cool_person" .
`
go ParseStream(strings.NewReader(data), cnq, done)
Loop:
for {
select {
case nq := <-cnq:
t.Logf("Got nquad: %v", nq)
case err := <-done:
if err != nil {
t.Errorf("While parsing data: %v", err)
}
break Loop
}
}
}
...@@ -30,7 +30,6 @@ type Status struct { ...@@ -30,7 +30,6 @@ type Status struct {
type DirectedEdge struct { type DirectedEdge struct {
Entity uint64 Entity uint64
EntityEid string
Attribute string Attribute string
Value interface{} Value interface{}
ValueId uint64 ValueId uint64
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment