Skip to content
Snippets Groups Projects
Commit 32aba405 authored by Manish R Jain's avatar Manish R Jain
Browse files

Assign uids for xids over the network. Convert NQuads to Edges using the...

Assign uids for xids over the network. Convert NQuads to Edges using the assigned uids, and then apply these as mutations over the network.
parent 0f7e05ae
No related branches found
No related tags found
No related merge requests found
......@@ -70,6 +70,45 @@ func (nq NQuad) ToEdge(instanceIdx,
return result, nil
}
func toUid(xid string, xidToUid map[string]uint64) (uid uint64, rerr error) {
id, present := xidToUid[xid]
if present {
return id, nil
}
if !strings.HasPrefix(xid, "_uid_:") {
return 0, fmt.Errorf("Unable to find xid: %v", xid)
}
return strconv.ParseUint(xid[6:], 0, 64)
}
func (nq NQuad) ToEdgeUsing(
xidToUid map[string]uint64) (result x.DirectedEdge, rerr error) {
uid, err := toUid(nq.Subject, xidToUid)
if err != nil {
return result, err
}
result.Entity = uid
if len(nq.ObjectId) == 0 {
result.Value = nq.ObjectValue
} else {
uid, err = toUid(nq.ObjectId, xidToUid)
if err != nil {
return result, err
}
result.ValueId = uid
}
if len(nq.Language) > 0 {
result.Attribute = nq.Predicate + "." + nq.Language
} else {
result.Attribute = nq.Predicate
}
result.Source = nq.Label
result.Timestamp = time.Now()
return result, nil
}
func stripBracketsIfPresent(val string) string {
if val[0] != '<' {
return val
......
......@@ -64,16 +64,54 @@ func addCorsHeaders(w http.ResponseWriter) {
func mutationHandler(mu *gql.Mutation) error {
r := strings.NewReader(mu.Set)
scanner := bufio.NewScanner(r)
var nquads []rdf.NQuad
for scanner.Scan() {
ln := strings.Trim(scanner.Text(), " \t")
if len(ln) == 0 {
continue
}
_, err := rdf.Parse(ln)
nq, err := rdf.Parse(ln)
if err != nil {
glog.WithError(err).Error("While parsing RDF.")
return err
}
nquads = append(nquads, nq)
}
xidToUid := make(map[string]uint64)
for _, nq := range nquads {
if !strings.HasPrefix("_uid_:", nq.Subject) {
xidToUid[nq.Subject] = 0
}
if !strings.HasPrefix("_uid_:", nq.ObjectId) {
xidToUid[nq.ObjectId] = 0
}
}
if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil {
return err
}
var edges []x.DirectedEdge
for _, nq := range nquads {
edge, err := nq.ToEdgeUsing(xidToUid)
if err != nil {
glog.WithField("nquad", nq).WithError(err).
Error("While converting to edge")
return err
}
edges = append(edges, edge)
}
left, err := worker.MutateOverNetwork(edges)
if err != nil {
return err
}
if len(left) > 0 {
glog.WithField("left", len(left)).Error("Some edges couldn't be applied")
for _, e := range left {
glog.WithField("edge", e).Debug("Unable to apply mutation")
}
return fmt.Errorf("Unapplied mutations")
}
return nil
}
......
......@@ -3,12 +3,13 @@ package worker
import (
"sync"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/uid"
"github.com/google/flatbuffers/go"
)
func createXidListBuffer(xids map[string]bool) []byte {
func createXidListBuffer(xids map[string]uint64) []byte {
b := flatbuffers.NewBuilder(0)
var offsets []flatbuffers.UOffsetT
for xid := range xids {
......@@ -69,3 +70,46 @@ func getOrAssignUids(
b.Finish(uend)
return b.Bytes[b.Head():], nil
}
func GetOrAssignUidsOverNetwork(xidToUid *map[string]uint64) (rerr error) {
query := new(conn.Query)
query.Data = createXidListBuffer(*xidToUid)
uo := flatbuffers.GetUOffsetT(query.Data)
xidList := new(task.XidList)
xidList.Init(query.Data, uo)
reply := new(conn.Reply)
if instanceIdx == 0 {
uo := flatbuffers.GetUOffsetT(query.Data)
xidList := new(task.XidList)
xidList.Init(query.Data, uo)
reply.Data, rerr = getOrAssignUids(xidList)
if rerr != nil {
return rerr
}
} else {
pool := pools[0]
if err := pool.Call("Worker.GetOrAssign", query, reply); err != nil {
glog.WithField("method", "GetOrAssign").WithError(err).
Error("While getting uids")
return err
}
}
uidList := new(task.UidList)
uo = flatbuffers.GetUOffsetT(reply.Data)
uidList.Init(reply.Data, uo)
if xidList.XidsLength() != uidList.UidsLength() {
glog.WithField("num_xids", xidList.XidsLength()).
WithField("num_uids", uidList.UidsLength()).
Fatal("Num xids don't match num uids")
}
for i := 0; i < xidList.XidsLength(); i++ {
xid := string(xidList.Xids(i))
uid := uidList.Uids(i)
(*xidToUid)[xid] = uid
}
return nil
}
......@@ -8,10 +8,10 @@ import (
)
func TestXidListBuffer(t *testing.T) {
xids := map[string]bool{
"b.0453": true,
"d.z1sz": true,
"e.abcd": true,
xids := map[string]uint64{
"b.0453": 0,
"d.z1sz": 0,
"e.abcd": 0,
}
buf := createXidListBuffer(xids)
......@@ -26,10 +26,10 @@ func TestXidListBuffer(t *testing.T) {
for i := 0; i < xl.XidsLength(); i++ {
xid := string(xl.Xids(i))
t.Logf("Found: %v", xid)
xids[xid] = false
xids[xid] = 7
}
for xid, untouched := range xids {
if untouched {
for xid, val := range xids {
if val != 7 {
t.Errorf("Expected xid: %v to be part of the buffer.", xid)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment