From 82085af99a01ad80fc9a327b7c57285ceed7e7eb Mon Sep 17 00:00:00 2001
From: Ashwin <ashwin2007ray@gmail.com>
Date: Thu, 28 Jan 2016 08:16:18 +0000
Subject: [PATCH] Changes made for assigning uids in a seperate pass

---
 loader/loader.go           |  68 +++++++++++++++++++++++++++++
 server/loader/.main.go.swp | Bin 0 -> 12288 bytes
 server/uidassigner/main.go |  85 +++++++++++++++++++++++++++++++++++++
 3 files changed, 153 insertions(+)
 create mode 100644 server/loader/.main.go.swp
 create mode 100644 server/uidassigner/main.go

diff --git a/loader/loader.go b/loader/loader.go
index 0ff5353e..bbf3309b 100644
--- a/loader/loader.go
+++ b/loader/loader.go
@@ -149,6 +149,32 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
 	wg.Done()
 }
 
+func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
+	for nq := range s.cnq {
+		if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 {
+			// Ignore due to mod sampling.
+			atomic.AddUint64(&s.ctr.ignored, 1)
+			continue
+		}
+
+		edge, err := nq.ToEdge()
+		for err != nil {
+			// Just put in a retry loop to tackle temporary errors.
+			if err == posting.E_TMP_ERROR {
+				time.Sleep(time.Microsecond)
+
+			} else {
+				glog.WithError(err).WithField("nq", nq).
+					Error("While converting to edge")
+				return
+			}
+			edge, err = nq.ToEdge()
+		}
+		glog.Info(edge);
+	}
+	wg.Done()
+}
+
 // Blocking function.
 func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) {
 	s := new(state)
@@ -189,3 +215,45 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) {
 	ticker.Stop()
 	return atomic.LoadUint64(&s.ctr.processed), nil
 }
+
+// Blocking function.
+func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) {
+	s := new(state)
+	s.ctr = new(counters)
+	ticker := time.NewTicker(time.Second)
+	go s.printCounters(ticker)
+
+	// Producer: Start buffering input to channel.
+	s.mod = mod
+	s.input = make(chan string, 10000)
+	go s.readLines(reader)
+
+	s.cnq = make(chan rdf.NQuad, 10000)
+	numr := runtime.GOMAXPROCS(-1)
+	done := make(chan error, numr)
+	for i := 0; i < numr; i++ {
+		go s.parseStream(done) // Input --> NQuads
+	}
+
+	wg := new(sync.WaitGroup)
+	for i := 0; i < 3000; i++ {
+		wg.Add(1)
+		go s.handleNQuadsWhileAssign(wg) //Different compared to HandleRdfReader
+	}
+
+	// Block until all parseStream goroutines are finished.
+	for i := 0; i < numr; i++ {
+		if err := <-done; err != nil {
+			glog.WithError(err).Fatal("While reading input.")
+		}
+	}
+
+	close(s.cnq)
+	// Okay, we've stopped input to cnq, and closed it.
+	// Now wait for handleNQuads to finish.
+	wg.Wait()
+
+	ticker.Stop()
+	return atomic.LoadUint64(&s.ctr.processed), nil
+}
+
diff --git a/server/loader/.main.go.swp b/server/loader/.main.go.swp
new file mode 100644
index 0000000000000000000000000000000000000000..2f79c005230d613b1ec91a204fa5c9de368b0154
GIT binary patch
literal 12288
zcmYc?2=nw+FxN9-U|?VnU|`s7J2mW@8au;9JqCuv;*9dlJR<`G^P<E`kUS2Yl3tWp
zkfEEBSzJ<-nN(VmnxdPMT862(4#hzI^!)hp{Gx3A;-X~z^vsfs(j>j){9JvgX_@&@
zmVR++QCVt{eolU3N@|gQZenJhUV1*J>7yc}Aut*OI76VcBu&?XH`Lh3&;X=HSxHer
zSSS?49L1v{Fd71*Aut*OqaiRF0;3@?8UmvsFd70QBm_zd7#Zpr7#NtK{!N0?jA%5J
zJ4%g)z-S1JhQMeDjE2By2#kinXb6mkz-S1JhQMeDjE2By2n@jxNK9d12w-7g_`(XA
z|A+Pe|MD|19OY+VIK<DuP|wf6V9n3KAi&SSz{}6TaF>sP;T9hQ!(=`N20uOq1|~iR
zhTXgj3^REd7^d<vFihcPU})!MU})oIV5sF~V94QRU`XL*V2I;oU<l@AU~uAPU@+%p
zU@+rlV36QtVEDwtz;K#}fngaB14Anh14A<p149!J149xI14AMY1A`e40|PS;1H(3M
z28L8_1_nuP1_lXk1_mx}28KUe3=Ge?7#I$6F)-}oVqjRz#lX<O#lVou#lR57#lR58
z#lT?A#lRrL#lRrR#lRrJ#lY~DlY!v|Cj-MeP6mcfP6mbqP6h@WP6h^RP6h@=P6mdr
z91IMfI2agiaWF6(;b36c#KFL@k%NI@76${vOb!NyLJkIoU=9WbTMh;WLk<Q8T@D6@
zU+fGF57-$P&apEvOl4<an8?n+FoB(cp@^M<A%UHNL5ZD#L5`h)L6)6?;Vl~j!)-PO
zh6QX440G5R7}D7o7^2x282s577<||m7_`_J7{0MW;shKoLoi*9I(ak%Mnhn5hCnR?
zXF+~(NoHQUo^NVVda6%maY?a;p@D&dmO@@>u5&?Yv8IB)z5+<3Ahk##Ilm}XkAX8i
zCqG@!GcPS)L&-TYCnqy6T_G2wT_FdgUrCdJvzCFAGfmGqC%-sVLz97%vj~X|)*YT%
zlHrz_nv<fTl$>9hSE8h&0H!se<_5XA6)O}Z78R$afXo9M2(cHW)3vB5zeppss7O=K
zEwLmqM?)z*BQqyeAtNy_1?-%nlr)8+RM0XpJ+Su7G=<cnA_YZTg}lrhg=z*)POxn{
zU{Nbu1;~;zJr9un)S#5KAh2GIA{_;--24<x1_n-oE(py_Ov*`BD9KkyE=o--NmWR%
z$}Aw@h$6THK#KMJQp=$hr;%u7enD!U0?5;8nK`MryqpHNEx%aLzaTYFqad*)0~9^z
zQ34WI(oq0Wum}MM4Je`LL5v5*FlTapUP)$NX(|IJXJ(p04$MRaTU!MKP$;M67b(O;
z%u%qiRVYf#OHWk*Ik6Zlo0*eZ3=#*e7R*a8)(bAk$t=;(DoROnugWYa)=^N>Q33@7
zERA~RWtM1w#Pc$9K;kK>X{kjD1;vQORZy%4R-^^h;*wdU$-r3vwk0pMT%))Il!Ajn
zBs7(Ry$(%8e)$SPE^Z3$QJw)%R~8qfCTFH)LJ}d=i*Wa8!u-j=$qC5~!Koz*xrr4D
z1x5ME#h|2+SDLGkTu@pJPK2P83r+I{MX6=+xrr5Eg`kiwN-YZjY0`vcFC@izrMdCR
z1*J+l@I0x<zzH$|tOFE)MWuNqnYpQY?*6`x5dlH|&cPb6?90FjWg)BcE6sHd2nFTL
zoc#2n(qg^f)DoZ6veX<6C?Dh<umC8SK~W72>Vl&DG`--G`~v5I(14<RP*j3U;Vghk
zBo>t*$)thO7DzWF=R!gmlxrC>!3hQ}OQNJtXK*Ic&`K^S1(}_hld8!8OJvA`3U-jB
z2C@b@E)m%gT#6{9<s_zqqaia*K@r5!3jmic8k*pwn3I?e5zx?NNGr`tRsgLo1<Ns%
zB^H6=5b6pATd+}j!6ikRdFdKT2=^)JC@6t&c~NFbDo6(?8Gu3vln_9)k|x+JMAAez
z3o26#G6jTPGK*4^OY(~<K_-Bct^!mEyrcy=E;l~~W>RQoUWu8BhEi^Rijt0kp^k!*
ze_l?dLP2J7wnA!NNoGlAYO#(&c}8kcszPaIih`;FShKBy0XSDHIp^o*CMp!C79<uW
zmZYXY$}o`Iixoh5Jux#6RM>+uNlIc#BGf~${0egfENF^S(m=%;)H@7d&x4{!!B(L{
z&nG`!LkUvs#)HBdl+iUAIF<0N|E*9Wpa2|3gcL!8kdQKn+t5@6XBL$f6sP9tL-L6d
z0|TcLwDbb0gyutiaCSfv05S86K_n>Uf#~G?+=8Oi;$nSJ2q`gS<`(1^l_+R1Fcc&v
zXD6nog3})ZgMyYmgMyYqPG)XqNn%N6eqOOcX<kZdkwQsEs)A2ua%x_2svby1aB3>D
zm;$Ke1*w4+V95$OiFxUzAcNEM%TkNL(O8gLl$%*x3^p|}F9l>wxMzrme`tt8xMNU|
zqhE-pYp{ZUkb<+npNnURr@vpYg1?)BqhF+gx2K<rjzVf?Nk(dsLTUxrD+>8V3YobD
zIhm;`U?(B10>^Z;LT0f7vV#0Pg~U7sCC6X|&tN45C&ysVU>%TyLi2J`i;ESCQVUBn
zi&9e*k}4Gv3kq^Flfjj3PGUL8*2MIp)YKGECdtfG0A;_-ymTFf;{3Fd^2DN41_dpU
zNt~P+B_#z``ugSN<$8$)iOCtMdih1^`Z<tLEY|n&bawR%cGWe~Gk|J|%r8~QO{`SN
zPbx{w%u`5INX{>)RLD<53F*WVkm;3BrFr=!3Z=!MxX3I9*C7h270IauB?_5&3ZOX8
z$xO^kPE{z+EXhDN1sq_|_)I|xEl03BVKJ?v5SCg5iZ2BtJp%;|Q2Z)EH7RLYLmlm$
zUr<?;nVwOiU}Ruus^FWLmsy;l5TxJ*@`6n+m|K*Un3?C0o|~AN16pZs$Dpsp002>m
BoWlSB

literal 0
HcmV?d00001

diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go
new file mode 100644
index 00000000..80bbe18b
--- /dev/null
+++ b/server/uidassigner/main.go
@@ -0,0 +1,85 @@
+package main
+
+import (
+        "compress/gzip"
+        "flag"
+        "os"
+        "runtime"
+        "runtime/pprof"
+        "strings"
+
+        "github.com/Sirupsen/logrus"
+        "github.com/dgraph-io/dgraph/loader"
+        "github.com/dgraph-io/dgraph/posting"
+        "github.com/dgraph-io/dgraph/store"
+        "github.com/dgraph-io/dgraph/x"
+)
+
+var glog = x.Log("uidassigner_main")
+
+var rdfGzips = flag.String("rdfgzips", "",
+        "Comma separated gzip files containing RDF data")
+var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.")
+var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists")
+var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
+
+func main() {
+        flag.Parse()
+        if !flag.Parsed() {
+                glog.Fatal("Unable to parse flags")
+        }
+        if len(*cpuprofile) > 0 {
+                f, err := os.Create(*cpuprofile)
+                if err != nil {
+                        glog.Fatal(err)
+                }
+                pprof.StartCPUProfile(f)
+                defer pprof.StopCPUProfile()
+        }
+
+
+	logrus.SetLevel(logrus.InfoLevel)
+	numCpus := runtime.NumCPU()
+	prevProcs := runtime.GOMAXPROCS(numCpus)
+	glog.WithField("num_cpus", numCpus).
+		WithField("prev_maxprocs", prevProcs).
+		Info("Set max procs to num  cpus")
+
+	if len(*rdfGzips) == 0 {
+		glog.Fatal("No RDF GZIP files specified")
+	}
+
+	ps := new(store.Store)
+	ps.Init(*uidDir)
+	defer ps.Close()
+
+	posting.Init(ps, nil)
+
+	files := strings.Split(*rdfGzips, ",")
+	for _, path := range files {
+		if len(path) == 0 {
+			continue
+		}
+		glog.WithField("path", path).Info("Handling...")
+		f, err := os.Open(path)
+		if err != nil {
+			glog.WithError(err).Fatal("Unable to open rdf file.")
+		}
+
+		r, err := gzip.NewReader(f)
+		if err != nil {
+			glog.WithError(err).Fatal("Unable to create gzip reader.")
+		}
+
+		count, err := loader.HandleRdfReaderWhileAssign(r, *mod)
+		if err != nil {
+			glog.WithError(err).Fatal("While handling rdf reader.")
+		}
+		glog.WithField("count", count).Info("RDFs parsed")
+		r.Close()
+		f.Close()
+	}
+	glog.Info("Calling merge lists")
+	posting.MergeLists(100 * numCpus) // 100 per core.
+}
+
-- 
GitLab