From 32aba405a2e9434239f21b0f0b84185cd154e410 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Mon, 29 Feb 2016 17:39:12 +1100
Subject: [PATCH] Assign uids for xids over the network. Convert NQuads to
 Edges using the assigned uids, and then apply these as mutations over the
 network.

---
 rdf/parse.go          | 39 ++++++++++++++++++++++++++++++++++++
 server/main.go        | 40 ++++++++++++++++++++++++++++++++++++-
 worker/assign.go      | 46 ++++++++++++++++++++++++++++++++++++++++++-
 worker/assign_test.go | 14 ++++++-------
 4 files changed, 130 insertions(+), 9 deletions(-)

diff --git a/rdf/parse.go b/rdf/parse.go
index 789553d4..0a8c8f6a 100644
--- a/rdf/parse.go
+++ b/rdf/parse.go
@@ -70,6 +70,45 @@ func (nq NQuad) ToEdge(instanceIdx,
 	return result, nil
 }
 
+func toUid(xid string, xidToUid map[string]uint64) (uid uint64, rerr error) {
+	id, present := xidToUid[xid]
+	if present {
+		return id, nil
+	}
+
+	if !strings.HasPrefix(xid, "_uid_:") {
+		return 0, fmt.Errorf("Unable to find xid: %v", xid)
+	}
+	return strconv.ParseUint(xid[6:], 0, 64)
+}
+
+func (nq NQuad) ToEdgeUsing(
+	xidToUid map[string]uint64) (result x.DirectedEdge, rerr error) {
+	uid, err := toUid(nq.Subject, xidToUid)
+	if err != nil {
+		return result, err
+	}
+	result.Entity = uid
+
+	if len(nq.ObjectId) == 0 {
+		result.Value = nq.ObjectValue
+	} else {
+		uid, err = toUid(nq.ObjectId, xidToUid)
+		if err != nil {
+			return result, err
+		}
+		result.ValueId = uid
+	}
+	if len(nq.Language) > 0 {
+		result.Attribute = nq.Predicate + "." + nq.Language
+	} else {
+		result.Attribute = nq.Predicate
+	}
+	result.Source = nq.Label
+	result.Timestamp = time.Now()
+	return result, nil
+}
+
 func stripBracketsIfPresent(val string) string {
 	if val[0] != '<' {
 		return val
diff --git a/server/main.go b/server/main.go
index f71564f5..2c857348 100644
--- a/server/main.go
+++ b/server/main.go
@@ -64,16 +64,54 @@ func addCorsHeaders(w http.ResponseWriter) {
 func mutationHandler(mu *gql.Mutation) error {
 	r := strings.NewReader(mu.Set)
 	scanner := bufio.NewScanner(r)
+	var nquads []rdf.NQuad
 	for scanner.Scan() {
 		ln := strings.Trim(scanner.Text(), " \t")
 		if len(ln) == 0 {
 			continue
 		}
-		_, err := rdf.Parse(ln)
+		nq, err := rdf.Parse(ln)
 		if err != nil {
 			glog.WithError(err).Error("While parsing RDF.")
 			return err
 		}
+		nquads = append(nquads, nq)
+	}
+
+	xidToUid := make(map[string]uint64)
+	for _, nq := range nquads {
+		if !strings.HasPrefix("_uid_:", nq.Subject) {
+			xidToUid[nq.Subject] = 0
+		}
+		if !strings.HasPrefix("_uid_:", nq.ObjectId) {
+			xidToUid[nq.ObjectId] = 0
+		}
+	}
+	if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil {
+		return err
+	}
+
+	var edges []x.DirectedEdge
+	for _, nq := range nquads {
+		edge, err := nq.ToEdgeUsing(xidToUid)
+		if err != nil {
+			glog.WithField("nquad", nq).WithError(err).
+				Error("While converting to edge")
+			return err
+		}
+		edges = append(edges, edge)
+	}
+
+	left, err := worker.MutateOverNetwork(edges)
+	if err != nil {
+		return err
+	}
+	if len(left) > 0 {
+		glog.WithField("left", len(left)).Error("Some edges couldn't be applied")
+		for _, e := range left {
+			glog.WithField("edge", e).Debug("Unable to apply mutation")
+		}
+		return fmt.Errorf("Unapplied mutations")
 	}
 	return nil
 }
diff --git a/worker/assign.go b/worker/assign.go
index 0b019dc4..52c4a721 100644
--- a/worker/assign.go
+++ b/worker/assign.go
@@ -3,12 +3,13 @@ package worker
 import (
 	"sync"
 
+	"github.com/dgraph-io/dgraph/conn"
 	"github.com/dgraph-io/dgraph/task"
 	"github.com/dgraph-io/dgraph/uid"
 	"github.com/google/flatbuffers/go"
 )
 
-func createXidListBuffer(xids map[string]bool) []byte {
+func createXidListBuffer(xids map[string]uint64) []byte {
 	b := flatbuffers.NewBuilder(0)
 	var offsets []flatbuffers.UOffsetT
 	for xid := range xids {
@@ -69,3 +70,46 @@ func getOrAssignUids(
 	b.Finish(uend)
 	return b.Bytes[b.Head():], nil
 }
+
+func GetOrAssignUidsOverNetwork(xidToUid *map[string]uint64) (rerr error) {
+	query := new(conn.Query)
+	query.Data = createXidListBuffer(*xidToUid)
+	uo := flatbuffers.GetUOffsetT(query.Data)
+	xidList := new(task.XidList)
+	xidList.Init(query.Data, uo)
+
+	reply := new(conn.Reply)
+	if instanceIdx == 0 {
+		uo := flatbuffers.GetUOffsetT(query.Data)
+		xidList := new(task.XidList)
+		xidList.Init(query.Data, uo)
+
+		reply.Data, rerr = getOrAssignUids(xidList)
+		if rerr != nil {
+			return rerr
+		}
+	} else {
+		pool := pools[0]
+		if err := pool.Call("Worker.GetOrAssign", query, reply); err != nil {
+			glog.WithField("method", "GetOrAssign").WithError(err).
+				Error("While getting uids")
+			return err
+		}
+	}
+
+	uidList := new(task.UidList)
+	uo = flatbuffers.GetUOffsetT(reply.Data)
+	uidList.Init(reply.Data, uo)
+
+	if xidList.XidsLength() != uidList.UidsLength() {
+		glog.WithField("num_xids", xidList.XidsLength()).
+			WithField("num_uids", uidList.UidsLength()).
+			Fatal("Num xids don't match num uids")
+	}
+	for i := 0; i < xidList.XidsLength(); i++ {
+		xid := string(xidList.Xids(i))
+		uid := uidList.Uids(i)
+		(*xidToUid)[xid] = uid
+	}
+	return nil
+}
diff --git a/worker/assign_test.go b/worker/assign_test.go
index 952e0017..3905b639 100644
--- a/worker/assign_test.go
+++ b/worker/assign_test.go
@@ -8,10 +8,10 @@ import (
 )
 
 func TestXidListBuffer(t *testing.T) {
-	xids := map[string]bool{
-		"b.0453": true,
-		"d.z1sz": true,
-		"e.abcd": true,
+	xids := map[string]uint64{
+		"b.0453": 0,
+		"d.z1sz": 0,
+		"e.abcd": 0,
 	}
 
 	buf := createXidListBuffer(xids)
@@ -26,10 +26,10 @@ func TestXidListBuffer(t *testing.T) {
 	for i := 0; i < xl.XidsLength(); i++ {
 		xid := string(xl.Xids(i))
 		t.Logf("Found: %v", xid)
-		xids[xid] = false
+		xids[xid] = 7
 	}
-	for xid, untouched := range xids {
-		if untouched {
+	for xid, val := range xids {
+		if val != 7 {
 			t.Errorf("Expected xid: %v to be part of the buffer.", xid)
 		}
 	}
-- 
GitLab