diff --git a/rdf/parse.go b/rdf/parse.go index 4d38ed2d741ab2c651fc9a837cf4d987c9dd31ab..42bc2904fffdd26e5e8e9b9e1d27f5491c9115d1 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -17,9 +17,15 @@ package rdf import ( + "bufio" "fmt" + "io" + "strings" + "time" "github.com/dgraph-io/dgraph/lex" + "github.com/dgraph-io/dgraph/uid" + "github.com/dgraph-io/dgraph/x" ) type NQuad struct { @@ -31,6 +37,27 @@ type NQuad struct { Language string } +func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { + sid, err := uid.GetOrAssign(nq.Subject) + if err != nil { + return result, err + } + result.Entity = sid + if len(nq.ObjectId) > 0 { + oid, err := uid.GetOrAssign(nq.ObjectId) + if err != nil { + return result, err + } + result.ValueId = oid + } else { + result.Value = nq.ObjectValue + } + result.Attribute = nq.Predicate + result.Source = nq.Label + result.Timestamp = time.Now() + return result, nil +} + func stripBracketsIfPresent(val string) string { if val[0] != '<' { return val @@ -94,3 +121,33 @@ func Parse(line string) (rnq NQuad, rerr error) { } return rnq, nil } + +func isNewline(r rune) bool { + return r == '\n' || r == '\r' +} + +func ParseStream(reader io.Reader, cnq chan NQuad, done chan error) { + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + line := scanner.Text() + line = strings.Trim(line, " \t") + if len(line) == 0 { + continue + } + + glog.Debugf("Got line: %q", line) + nq, err := Parse(line) + if err != nil { + x.Err(glog, err).Errorf("While parsing: %q", line) + done <- err + return + } + cnq <- nq + } + if err := scanner.Err(); err != nil { + x.Err(glog, err).Error("While scanning input") + done <- err + return + } + done <- nil +} diff --git a/rdf/parse_test.go b/rdf/parse_test.go index af2433e3f2f0b1ddf9af4def3baae0cbaeb335b7..f621770e6b74cb2a5568b108c2ea1502e206cf31 100644 --- a/rdf/parse_test.go +++ b/rdf/parse_test.go @@ -18,6 +18,7 @@ package rdf import ( "reflect" + "strings" "testing" ) @@ -197,3 +198,36 @@ func TestLex(t *testing.T) { } } } + +func TestParseStream(t *testing.T) { + cnq := make(chan NQuad, 10) + done := make(chan error) + + data := ` + + <alice> <follows> <bob> . + <bob> <follows> <fred> . + <bob> <status> "cool_person" . + <charlie> <follows> <bob> . + <charlie> <follows> <dani> . + <dani> <follows> <bob> . + <dani> <follows> <greg> . + <dani> <status> "cool_person" . + <emily> <follows> <fred> . + <fred> <follows> <greg> . + <greg> <status> "cool_person" . + ` + go ParseStream(strings.NewReader(data), cnq, done) +Loop: + for { + select { + case nq := <-cnq: + t.Logf("Got nquad: %v", nq) + case err := <-done: + if err != nil { + t.Errorf("While parsing data: %v", err) + } + break Loop + } + } +} diff --git a/x/x.go b/x/x.go index 8c1a87d80915d14156ea20e6013b09e137d0ecc0..5873177eb281122a83c966b73aa79e0af3a7ef4a 100644 --- a/x/x.go +++ b/x/x.go @@ -30,7 +30,6 @@ type Status struct { type DirectedEdge struct { Entity uint64 - EntityEid string Attribute string Value interface{} ValueId uint64