From 2e5615e6aa45f984e8ccf20140d470cc17b55c82 Mon Sep 17 00:00:00 2001
From: Ashwin <ashwin2007ray@gmail.com>
Date: Mon, 1 Feb 2016 06:20:08 +0000
Subject: [PATCH] Change arguments of GetUid to take the machine Id and the
 number of Instances

---
 loader/loader.go           | 22 +++++++++++-----------
 query/query.go             |  2 +-
 rdf/parse.go               | 10 +++++-----
 server/uidassigner/main.go |  5 +++--
 uid/assigner.go            | 10 +++++-----
 uid/assigner_test.go       |  6 +++---
 6 files changed, 28 insertions(+), 27 deletions(-)

diff --git a/loader/loader.go b/loader/loader.go
index fe386973..84968e59 100644
--- a/loader/loader.go
+++ b/loader/loader.go
@@ -47,6 +47,7 @@ type state struct {
 	cnq   chan rdf.NQuad
 	ctr   *counters
 	mod   uint64
+	numInstance uint64
 }
 
 func (s *state) printCounters(ticker *time.Ticker) {
@@ -127,7 +128,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
 			continue
 		}
 
-		edge, err := nq.ToEdge()
+		edge, err := nq.ToEdge(s.mod, s.numInstance)
 		for err != nil {
 			// Just put in a retry loop to tackle temporary errors.
 			if err == posting.E_TMP_ERROR {
@@ -138,7 +139,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
 					Error("While converting to edge")
 				return
 			}
-			edge, err = nq.ToEdge()
+			edge, err = nq.ToEdge(s.mod, s.numInstance)
 		}
 
 		key := posting.Key(edge.Entity, edge.Attribute)
@@ -151,12 +152,11 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
 
 func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
 	for nq := range s.cnq {
-		if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 {
+		if farm.Fingerprint64([]byte(nq.Subject))%s.numInstance != s.mod {
 			// This instance shouldnt assign UID to this string
 			atomic.AddUint64(&s.ctr.ignored, 1)
-			continue
 		} else {
-			_, err := rdf.GetUid(nq.Subject)
+			_, err := rdf.GetUid(nq.Subject, s.mod, s.numInstance)
 			for err != nil {
 				// Just put in a retry loop to tackle temporary errors.
 				if err == posting.E_TMP_ERROR {
@@ -166,16 +166,15 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
 						Error("While getting UID")
 					return
 				}
-				_, err = rdf.GetUid(nq.Subject)
+				_, err = rdf.GetUid(nq.Subject, s.mod, s.numInstance)
 			}
 		}
 
-		if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.mod != 0 {
+		if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstance != s.mod {
                         // This instance shouldnt or cant assign UID to this string
                         atomic.AddUint64(&s.ctr.ignored, 1)
-                        continue
                 } else {
-                        _, err := rdf.GetUid(nq.ObjectId)
+                        _, err := rdf.GetUid(nq.ObjectId, s.mod, s.numInstance)
                         for err != nil {
                                 // Just put in a retry loop to tackle temporary errors.
                                 if err == posting.E_TMP_ERROR {
@@ -185,7 +184,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
                                                 Error("While getting UID")
                                         return
                                 }
-                                _, err = rdf.GetUid(nq.ObjectId)
+                                _, err = rdf.GetUid(nq.ObjectId, s.mod, s.numInstance)
                         }
                 }
 	}
@@ -234,7 +233,7 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) {
 }
 
 // Blocking function.
-func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) {
+func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64, numInstance uint64) (uint64, error) {
 	s := new(state)
 	s.ctr = new(counters)
 	ticker := time.NewTicker(time.Second)
@@ -242,6 +241,7 @@ func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) {
 
 	// Producer: Start buffering input to channel.
 	s.mod = mod
+	s.numInstance = numInstance
 	s.input = make(chan string, 10000)
 	go s.readLines(reader)
 
diff --git a/query/query.go b/query/query.go
index 2d1ccb20..3ee69a2a 100644
--- a/query/query.go
+++ b/query/query.go
@@ -255,7 +255,7 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) {
 	// This would set the Result field in SubGraph,
 	// and populate the children for attributes.
 	if len(exid) > 0 {
-		u, err := uid.GetOrAssign(exid)
+		u, err := uid.GetOrAssign(exid, 0, 1)  // mod = 0, numInstances = 1 by default
 		if err != nil {
 			x.Err(glog, err).WithField("xid", exid).Error(
 				"While GetOrAssign uid from external id")
diff --git a/rdf/parse.go b/rdf/parse.go
index 809ef63e..58f464d9 100644
--- a/rdf/parse.go
+++ b/rdf/parse.go
@@ -36,21 +36,21 @@ type NQuad struct {
 	Language    string
 }
 
-func GetUid(s string) (uint64, error) {
+func GetUid(s string, mod uint64, numInst uint64) (uint64, error) {
 	if strings.HasPrefix(s, "_uid_:") {
 		return strconv.ParseUint(s[6:], 0, 64)
 	}
-	return uid.GetOrAssign(s)
+	return uid.GetOrAssign(s, mod, numInst)
 }
 
-func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) {
-	sid, err := GetUid(nq.Subject)
+func (nq NQuad) ToEdge(mod, numInst uint64) (result x.DirectedEdge, rerr error) {
+	sid, err := GetUid(nq.Subject, mod, numInst)
 	if err != nil {
 		return result, err
 	}
 	result.Entity = sid
 	if len(nq.ObjectId) > 0 {
-		oid, err := GetUid(nq.ObjectId)
+		oid, err := GetUid(nq.ObjectId, mod, numInst)
 		if err != nil {
 			return result, err
 		}
diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go
index c9babee4..422c7642 100644
--- a/server/uidassigner/main.go
+++ b/server/uidassigner/main.go
@@ -19,7 +19,8 @@ var glog = x.Log("uidassigner_main")
 
 var rdfGzips = flag.String("rdfgzips", "",
         "Comma separated gzip files containing RDF data")
-var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.")
+var mod = flag.Uint64("machineid", 1, "Only pick entities, where uid % mod == 0.")
+var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared")
 var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists")
 var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
 var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process")
@@ -72,7 +73,7 @@ func main() {
 			glog.WithError(err).Fatal("Unable to create gzip reader.")
 		}
 
-		count, err := loader.HandleRdfReaderWhileAssign(r, *mod)
+		count, err := loader.HandleRdfReaderWhileAssign(r, *mod, *numInstance)
 		if err != nil {
 			glog.WithError(err).Fatal("While handling rdf reader.")
 		}
diff --git a/uid/assigner.go b/uid/assigner.go
index 85ad3950..8e486e2b 100644
--- a/uid/assigner.go
+++ b/uid/assigner.go
@@ -92,7 +92,7 @@ func init() {
 	// go lmgr.clean()
 }
 
-func allocateUniqueUid(xid string) (uid uint64, rerr error) {
+func allocateUniqueUid(xid string, mod uint64, numInst uint64) (uid uint64, rerr error) {
 	for sp := ""; ; sp += " " {
 		txid := xid + sp
 		uid = farm.Fingerprint64([]byte(txid)) // Generate from hash.
@@ -133,7 +133,7 @@ func allocateUniqueUid(xid string) (uid uint64, rerr error) {
 		" Wake the stupid developer up.")
 }
 
-func assignNew(pl *posting.List, xid string) (uint64, error) {
+func assignNew(pl *posting.List, xid string, mod uint64, numInst uint64) (uint64, error) {
 	entry := lmgr.newOrExisting(xid)
 	entry.Lock()
 	entry.ts = time.Now()
@@ -151,7 +151,7 @@ func assignNew(pl *posting.List, xid string) (uint64, error) {
 	}
 
 	// No current id exists. Create one.
-	uid, err := allocateUniqueUid(xid)
+	uid, err := allocateUniqueUid(xid, mod, numInst)
 	if err != nil {
 		return 0, err
 	}
@@ -173,11 +173,11 @@ func stringKey(xid string) []byte {
 	return buf.Bytes()
 }
 
-func GetOrAssign(xid string) (uid uint64, rerr error) {
+func GetOrAssign(xid string, mod uint64, numInst uint64) (uid uint64, rerr error) {
 	key := stringKey(xid)
 	pl := posting.GetOrCreate(key)
 	if pl.Length() == 0 {
-		return assignNew(pl, xid)
+		return assignNew(pl, xid, mod, numInst)
 
 	} else if pl.Length() > 1 {
 		glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid)
diff --git a/uid/assigner_test.go b/uid/assigner_test.go
index bf6d7331..826c8858 100644
--- a/uid/assigner_test.go
+++ b/uid/assigner_test.go
@@ -46,7 +46,7 @@ func TestGetOrAssign(t *testing.T) {
 
 	var u1, u2 uint64
 	{
-		uid, err := GetOrAssign("externalid0")
+		uid, err := GetOrAssign("externalid0", 0, 1)
 		if err != nil {
 			t.Error(err)
 		}
@@ -55,7 +55,7 @@ func TestGetOrAssign(t *testing.T) {
 	}
 
 	{
-		uid, err := GetOrAssign("externalid1")
+		uid, err := GetOrAssign("externalid1", 0, 1)
 		if err != nil {
 			t.Error(err)
 		}
@@ -69,7 +69,7 @@ func TestGetOrAssign(t *testing.T) {
 	// return
 
 	{
-		uid, err := GetOrAssign("externalid0")
+		uid, err := GetOrAssign("externalid0", 0, 1)
 		if err != nil {
 			t.Error(err)
 		}
-- 
GitLab