From 6f460e6d61dfb7b1280b7927468ae1f8176bf1bd Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Tue, 17 Nov 2015 18:03:19 +1100
Subject: [PATCH] Randomize input to avoid contention for RDFs containing the
 same subject. Don't call assigner clean(), because it hangs the entire
 process.

---
 loader/loader.go         |  50 ++++++++++++++-----
 server/loader/.gitignore |   1 +
 server/loader/main.go    | 102 +++++++++++++++++++++++++++++++++++++++
 uid/assigner.go          |   2 +-
 4 files changed, 142 insertions(+), 13 deletions(-)
 create mode 100644 server/loader/.gitignore
 create mode 100644 server/loader/main.go

diff --git a/loader/loader.go b/loader/loader.go
index a276b192..00821ba6 100644
--- a/loader/loader.go
+++ b/loader/loader.go
@@ -19,6 +19,7 @@ package loader
 import (
 	"bufio"
 	"io"
+	"math/rand"
 	"runtime"
 	"strings"
 	"sync"
@@ -48,26 +49,47 @@ type state struct {
 	mod   uint64
 }
 
-func printCounters(ticker *time.Ticker, c *counters) {
+func (s *state) printCounters(ticker *time.Ticker) {
 	for _ = range ticker.C {
+		parsed := atomic.LoadUint64(&s.ctr.parsed)
+		ignored := atomic.LoadUint64(&s.ctr.ignored)
+		processed := atomic.LoadUint64(&s.ctr.processed)
+		pending := parsed - ignored - processed
 		glog.WithFields(logrus.Fields{
-			"read":      atomic.LoadUint64(&c.read),
-			"parsed":    atomic.LoadUint64(&c.parsed),
-			"processed": atomic.LoadUint64(&c.processed),
-			"ignored":   atomic.LoadUint64(&c.ignored),
+			"read":      atomic.LoadUint64(&s.ctr.read),
+			"processed": processed,
+			"parsed":    parsed,
+			"ignored":   ignored,
+			"pending":   pending,
+			"len_cnq":   len(s.cnq),
 		}).Info("Counters")
 	}
 }
 
 func (s *state) readLines(r io.Reader) {
+	var buf []string
 	scanner := bufio.NewScanner(r)
+	// Randomize lines to avoid contention on same subject.
+	for i := 0; i < 1000; i++ {
+		if scanner.Scan() {
+			buf = append(buf, scanner.Text())
+		} else {
+			break
+		}
+	}
+	ln := len(buf)
 	for scanner.Scan() {
-		s.input <- scanner.Text()
+		k := rand.Intn(ln)
+		s.input <- buf[k]
+		buf[k] = scanner.Text()
 		atomic.AddUint64(&s.ctr.read, 1)
 	}
 	if err := scanner.Err(); err != nil {
 		glog.WithError(err).Fatal("While reading file.")
 	}
+	for i := 0; i < len(buf); i++ {
+		s.input <- buf[i]
+	}
 	close(s.input)
 }
 
@@ -119,29 +141,33 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) {
 	s := new(state)
 	s.ctr = new(counters)
 	ticker := time.NewTicker(time.Second)
-	go printCounters(ticker, s.ctr)
+	go s.printCounters(ticker)
 
 	// Producer: Start buffering input to channel.
 	s.mod = mod
 	s.input = make(chan string, 10000)
 	go s.readLines(reader)
 
-	numr := runtime.GOMAXPROCS(-1)
 	s.cnq = make(chan rdf.NQuad, 10000)
+	numr := runtime.GOMAXPROCS(-1)
 	done := make(chan error, numr)
-	wg := new(sync.WaitGroup)
 	for i := 0; i < numr; i++ {
-		wg.Add(1)
 		go s.parseStream(done) // Input --> NQuads
-		go s.handleNQuads(wg)  // NQuads --> Posting list [slow].
 	}
 
-	// The following will block until all ParseStream goroutines finish.
+	wg := new(sync.WaitGroup)
+	for i := 0; i < 100; i++ {
+		wg.Add(1)
+		go s.handleNQuads(wg) // NQuads --> Posting list [slow].
+	}
+
+	// Block until all parseStream goroutines are finished.
 	for i := 0; i < numr; i++ {
 		if err := <-done; err != nil {
 			glog.WithError(err).Fatal("While reading input.")
 		}
 	}
+
 	close(s.cnq)
 	// Okay, we've stopped input to cnq, and closed it.
 	// Now wait for handleNQuads to finish.
diff --git a/server/loader/.gitignore b/server/loader/.gitignore
new file mode 100644
index 00000000..b57fbba6
--- /dev/null
+++ b/server/loader/.gitignore
@@ -0,0 +1 @@
+/loader
diff --git a/server/loader/main.go b/server/loader/main.go
new file mode 100644
index 00000000..3c3af7d3
--- /dev/null
+++ b/server/loader/main.go
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * 		http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+	"compress/gzip"
+	"flag"
+	"os"
+	"runtime"
+	"runtime/pprof"
+	"strings"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/dgraph-io/dgraph/commit"
+	"github.com/dgraph-io/dgraph/loader"
+	"github.com/dgraph-io/dgraph/posting"
+	"github.com/dgraph-io/dgraph/store"
+	"github.com/dgraph-io/dgraph/x"
+)
+
+var glog = x.Log("loader_main")
+
+var rdfGzips = flag.String("rdfgzips", "",
+	"Comma separated gzip files containing RDF data")
+var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.")
+var numgo = flag.Int("numgo", 4,
+	"Number of goroutines to use for reading file.")
+var postingDir = flag.String("postings", "", "Directory to store posting lists")
+var mutationDir = flag.String("mutations", "", "Directory to store mutations")
+var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
+
+func main() {
+	flag.Parse()
+	if !flag.Parsed() {
+		glog.Fatal("Unable to parse flags")
+	}
+	if len(*cpuprofile) > 0 {
+		f, err := os.Create(*cpuprofile)
+		if err != nil {
+			glog.Fatal(err)
+		}
+		pprof.StartCPUProfile(f)
+		defer pprof.StopCPUProfile()
+	}
+
+	logrus.SetLevel(logrus.InfoLevel)
+	glog.WithField("gomaxprocs", runtime.GOMAXPROCS(-1)).Info("Number of CPUs")
+
+	if len(*rdfGzips) == 0 {
+		glog.Fatal("No RDF GZIP files specified")
+	}
+	ps := new(store.Store)
+	ps.Init(*postingDir)
+	defer ps.Close()
+
+	clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
+	clog.SkipWrite = true // Don't write to commit logs.
+	clog.Init()
+	defer clog.Close()
+	posting.Init(ps, clog)
+
+	files := strings.Split(*rdfGzips, ",")
+	for _, path := range files {
+		if len(path) == 0 {
+			continue
+		}
+		glog.WithField("path", path).Info("Handling...")
+		f, err := os.Open(path)
+		if err != nil {
+			glog.WithError(err).Fatal("Unable to open rdf file.")
+		}
+
+		r, err := gzip.NewReader(f)
+		if err != nil {
+			glog.WithError(err).Fatal("Unable to create gzip reader.")
+		}
+
+		count, err := loader.HandleRdfReader(r, *mod)
+		if err != nil {
+			glog.Fatal(err)
+		}
+		glog.WithField("count", count).Info("RDFs parsed")
+		r.Close()
+		f.Close()
+	}
+	glog.Info("Calling merge lists")
+	posting.MergeLists(100)
+}
diff --git a/uid/assigner.go b/uid/assigner.go
index cf6aace7..c2b85fe8 100644
--- a/uid/assigner.go
+++ b/uid/assigner.go
@@ -89,7 +89,7 @@ func (lm *lockManager) clean() {
 func init() {
 	lmgr = new(lockManager)
 	lmgr.locks = make(map[string]*entry)
-	go lmgr.clean()
+	// go lmgr.clean()
 }
 
 func allocateUniqueUid(xid string) (uid uint64, rerr error) {
-- 
GitLab