From 9fb14d45cba6789ac440ed81344d6800f4e98532 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Tue, 17 Nov 2015 16:42:26 +1100
Subject: [PATCH] Move loader out of server. HandleRDFReader to it's own
 package.

---
 loader/loader.go    | 152 ++++++++++++++++++++++++++++++++++++
 posting/lists.go    | 115 +++++++++++++++++++---------
 server/main.go      | 182 ++------------------------------------------
 server/main_test.go |   3 +-
 4 files changed, 240 insertions(+), 212 deletions(-)
 create mode 100644 loader/loader.go

diff --git a/loader/loader.go b/loader/loader.go
new file mode 100644
index 00000000..a276b192
--- /dev/null
+++ b/loader/loader.go
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * 		http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package loader
+
+import (
+	"bufio"
+	"io"
+	"runtime"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/dgraph-io/dgraph/posting"
+	"github.com/dgraph-io/dgraph/rdf"
+	"github.com/dgraph-io/dgraph/x"
+	"github.com/dgryski/go-farm"
+)
+
+var glog = x.Log("loader")
+
+type counters struct {
+	read      uint64
+	parsed    uint64
+	processed uint64
+	ignored   uint64
+}
+
+type state struct {
+	input chan string
+	cnq   chan rdf.NQuad
+	ctr   *counters
+	mod   uint64
+}
+
+func printCounters(ticker *time.Ticker, c *counters) {
+	for _ = range ticker.C {
+		glog.WithFields(logrus.Fields{
+			"read":      atomic.LoadUint64(&c.read),
+			"parsed":    atomic.LoadUint64(&c.parsed),
+			"processed": atomic.LoadUint64(&c.processed),
+			"ignored":   atomic.LoadUint64(&c.ignored),
+		}).Info("Counters")
+	}
+}
+
+func (s *state) readLines(r io.Reader) {
+	scanner := bufio.NewScanner(r)
+	for scanner.Scan() {
+		s.input <- scanner.Text()
+		atomic.AddUint64(&s.ctr.read, 1)
+	}
+	if err := scanner.Err(); err != nil {
+		glog.WithError(err).Fatal("While reading file.")
+	}
+	close(s.input)
+}
+
+func (s *state) parseStream(done chan error) {
+	for line := range s.input {
+		line = strings.Trim(line, " \t")
+		if len(line) == 0 {
+			glog.Info("Empty line.")
+			continue
+		}
+
+		glog.Debugf("Got line: %q", line)
+		nq, err := rdf.Parse(line)
+		if err != nil {
+			glog.WithError(err).Errorf("While parsing: %q", line)
+			done <- err
+			return
+		}
+		s.cnq <- nq
+		atomic.AddUint64(&s.ctr.parsed, 1)
+	}
+	done <- nil
+}
+
+func (s *state) handleNQuads(wg *sync.WaitGroup) {
+	for nq := range s.cnq {
+		if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 {
+			// Ignore due to mod sampling.
+			atomic.AddUint64(&s.ctr.ignored, 1)
+			continue
+		}
+
+		edge, err := nq.ToEdge()
+		if err != nil {
+			glog.WithError(err).WithField("nq", nq).Error("While converting to edge")
+			return
+		}
+
+		key := posting.Key(edge.Entity, edge.Attribute)
+		plist := posting.Get(key)
+		plist.AddMutation(edge, posting.Set)
+		atomic.AddUint64(&s.ctr.processed, 1)
+	}
+	wg.Done()
+}
+
+// Blocking function.
+func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) {
+	s := new(state)
+	s.ctr = new(counters)
+	ticker := time.NewTicker(time.Second)
+	go printCounters(ticker, s.ctr)
+
+	// Producer: Start buffering input to channel.
+	s.mod = mod
+	s.input = make(chan string, 10000)
+	go s.readLines(reader)
+
+	numr := runtime.GOMAXPROCS(-1)
+	s.cnq = make(chan rdf.NQuad, 10000)
+	done := make(chan error, numr)
+	wg := new(sync.WaitGroup)
+	for i := 0; i < numr; i++ {
+		wg.Add(1)
+		go s.parseStream(done) // Input --> NQuads
+		go s.handleNQuads(wg)  // NQuads --> Posting list [slow].
+	}
+
+	// The following will block until all ParseStream goroutines finish.
+	for i := 0; i < numr; i++ {
+		if err := <-done; err != nil {
+			glog.WithError(err).Fatal("While reading input.")
+		}
+	}
+	close(s.cnq)
+	// Okay, we've stopped input to cnq, and closed it.
+	// Now wait for handleNQuads to finish.
+	wg.Wait()
+
+	ticker.Stop()
+	return atomic.LoadUint64(&s.ctr.processed), nil
+}
diff --git a/posting/lists.go b/posting/lists.go
index e445aa10..0ceb28c6 100644
--- a/posting/lists.go
+++ b/posting/lists.go
@@ -18,6 +18,7 @@ package posting
 
 import (
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/Sirupsen/logrus"
@@ -30,6 +31,25 @@ type entry struct {
 	l *List
 }
 
+type counters struct {
+	added  uint64
+	merged uint64
+}
+
+func (c *counters) periodicLog() {
+	for _ = range time.Tick(time.Second) {
+		added := atomic.LoadUint64(&c.added)
+		merged := atomic.LoadUint64(&c.merged)
+		pending := added - merged
+
+		glog.WithFields(logrus.Fields{
+			"added":   added,
+			"merged":  merged,
+			"pending": pending,
+		}).Info("Merge counters")
+	}
+}
+
 var lmutex sync.RWMutex
 var lcache map[uint64]*entry
 var pstore *store.Store
@@ -43,9 +63,7 @@ func Init(posting *store.Store, log *commit.Logger) {
 	lcache = make(map[uint64]*entry)
 	pstore = posting
 	clog = log
-	ch = make(chan uint64, 1000)
-	go queueForProcessing()
-	go process()
+	ch = make(chan uint64, 10000)
 }
 
 func get(k uint64) *List {
@@ -82,50 +100,75 @@ func Get(key []byte) *List {
 	return e.l
 }
 
-func queueForProcessing() {
-	ticker := time.NewTicker(time.Minute)
-	for _ = range ticker.C {
-		count := 0
-		skipped := 0
-		lmutex.RLock()
-		now := time.Now()
-		for eid, e := range lcache {
-			if len(ch) >= cap(ch) {
-				break
-			}
-			if len(ch) < int(0.3*float32(cap(ch))) && e.l.IsDirty() {
-				// Let's add some work here.
-				ch <- eid
-				count += 1
-			} else if now.Sub(e.l.LastCompactionTs()) > 10*time.Minute {
-				// Only queue lists which haven't been processed for a while.
-				ch <- eid
-				count += 1
-			} else {
-				skipped += 1
-			}
+func queueForProcessing(c *counters) {
+	lmutex.RLock()
+	for eid, e := range lcache {
+		if len(ch) >= cap(ch) {
+			break
+		}
+		if e.l.IsDirty() {
+			ch <- eid
+			atomic.AddUint64(&c.added, 1)
 		}
-		lmutex.RUnlock()
-		glog.WithFields(logrus.Fields{
-			"added":   count,
-			"skipped": skipped,
-			"pending": len(ch),
-		}).Info("Added for compaction")
 	}
+	lmutex.RUnlock()
 }
 
-func process() {
-	ticker := time.NewTicker(100 * time.Millisecond)
+func periodicQueueForProcessing(c *counters) {
+	ticker := time.NewTicker(time.Minute)
 	for _ = range ticker.C {
-		eid := <-ch // blocking.
+		queueForProcessing(c)
+	}
+}
+
+func process(c *counters, wg *sync.WaitGroup) {
+	for eid := range ch {
 		l := get(eid)
 		if l == nil {
 			continue
 		}
-		glog.WithField("eid", eid).WithField("pending", len(ch)).
-			Info("Commiting list")
+		atomic.AddUint64(&c.merged, 1)
 		if err := l.MergeIfDirty(); err != nil {
 			glog.WithError(err).Error("While commiting dirty list.")
 		}
 	}
+	if wg != nil {
+		wg.Done()
+	}
+}
+
+func periodicProcess(c *counters) {
+	ticker := time.NewTicker(100 * time.Millisecond)
+	for _ = range ticker.C {
+		process(c, nil)
+	}
+}
+
+func queueAll(c *counters) {
+	lmutex.RLock()
+	for hid, _ := range lcache {
+		ch <- hid
+		atomic.AddUint64(&c.added, 1)
+	}
+	close(ch)
+	lmutex.RUnlock()
+}
+
+func StartPeriodicMerging() {
+	ctr := new(counters)
+	go periodicQueueForProcessing(ctr)
+	go periodicProcess(ctr)
+}
+
+func MergeLists(numRoutines int) {
+	c := new(counters)
+	go c.periodicLog()
+	go queueAll(c)
+
+	wg := new(sync.WaitGroup)
+	for i := 0; i < numRoutines; i++ {
+		wg.Add(1)
+		go process(c, wg)
+	}
+	wg.Wait()
 }
diff --git a/server/main.go b/server/main.go
index f8582fd9..880bf99b 100644
--- a/server/main.go
+++ b/server/main.go
@@ -17,168 +17,26 @@
 package main
 
 import (
-	"bufio"
-	"compress/gzip"
 	"flag"
 	"fmt"
-	"io"
 	"io/ioutil"
 	"net/http"
-	"os"
 	"runtime"
-	"strings"
-	"sync"
-	"sync/atomic"
-	"time"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/dgraph-io/dgraph/commit"
 	"github.com/dgraph-io/dgraph/gql"
 	"github.com/dgraph-io/dgraph/posting"
 	"github.com/dgraph-io/dgraph/query"
-	"github.com/dgraph-io/dgraph/rdf"
 	"github.com/dgraph-io/dgraph/store"
 	"github.com/dgraph-io/dgraph/x"
-	"github.com/dgryski/go-farm"
 )
 
-var glog = x.Log("rdf")
+var glog = x.Log("server")
 
 var postingDir = flag.String("postings", "", "Directory to store posting lists")
 var mutationDir = flag.String("mutations", "", "Directory to store mutations")
-var rdfGzips = flag.String("rdfgzips", "",
-	"Comma separated gzip files containing RDF data")
-var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.")
 var port = flag.String("port", "8080", "Port to run server on.")
-var numgo = flag.Int("numgo", 4,
-	"Number of goroutines to use for reading file.")
-
-type counters struct {
-	read      uint64
-	parsed    uint64
-	processed uint64
-	ignored   uint64
-}
-
-func printCounters(ticker *time.Ticker, c *counters) {
-	for _ = range ticker.C {
-		glog.WithFields(logrus.Fields{
-			"read":      atomic.LoadUint64(&c.read),
-			"parsed":    atomic.LoadUint64(&c.parsed),
-			"processed": atomic.LoadUint64(&c.processed),
-			"ignored":   atomic.LoadUint64(&c.ignored),
-		}).Info("Counters")
-	}
-}
-
-func readLines(r io.Reader, input chan string, c *counters) {
-	scanner := bufio.NewScanner(r)
-	for scanner.Scan() {
-		input <- scanner.Text()
-		atomic.AddUint64(&c.read, 1)
-	}
-	if err := scanner.Err(); err != nil {
-		glog.WithError(err).Fatal("While reading file.")
-	}
-	close(input)
-}
-
-func parseStream(input chan string, cnq chan rdf.NQuad,
-	done chan error, c *counters) {
-
-	for line := range input {
-		line = strings.Trim(line, " \t")
-		if len(line) == 0 {
-			glog.Info("Empty line.")
-			continue
-		}
-
-		glog.Debugf("Got line: %q", line)
-		nq, err := rdf.Parse(line)
-		if err != nil {
-			x.Err(glog, err).Errorf("While parsing: %q", line)
-			done <- err
-			return
-		}
-		cnq <- nq
-		atomic.AddUint64(&c.parsed, 1)
-	}
-	done <- nil
-}
-
-func handleNQuads(cnq chan rdf.NQuad, ctr *counters, wg *sync.WaitGroup) {
-	for nq := range cnq {
-		if farm.Fingerprint64([]byte(nq.Subject))%*mod != 0 {
-			// Ignore due to mod sampling.
-			atomic.AddUint64(&ctr.ignored, 1)
-			continue
-		}
-
-		edge, err := nq.ToEdge()
-		if err != nil {
-			x.Err(glog, err).WithField("nq", nq).Error("While converting to edge")
-			return
-		}
-
-		key := posting.Key(edge.Entity, edge.Attribute)
-		plist := posting.Get(key)
-		plist.AddMutation(edge, posting.Set)
-		atomic.AddUint64(&ctr.processed, 1)
-	}
-	wg.Done()
-}
-
-// Blocking function.
-func handleRdfReader(reader io.Reader) (uint64, error) {
-	ctr := new(counters)
-	ticker := time.NewTicker(time.Second)
-	go printCounters(ticker, ctr)
-
-	// Producer: Start buffering input to channel.
-	input := make(chan string, 10000)
-	go readLines(reader, input, ctr)
-
-	cnq := make(chan rdf.NQuad, 10000)
-	done := make(chan error, *numgo)
-	wg := new(sync.WaitGroup)
-	for i := 0; i < *numgo; i++ {
-		wg.Add(1)
-		go parseStream(input, cnq, done, ctr) // Input --> NQuads
-		go handleNQuads(cnq, ctr, wg)         // NQuads --> Posting list
-	}
-
-	// The following will block until all ParseStream goroutines finish.
-	for i := 0; i < *numgo; i++ {
-		if err := <-done; err != nil {
-			glog.WithError(err).Fatal("While reading input.")
-		}
-	}
-	close(cnq)
-	// Okay, we've stopped input to cnq, and closed it.
-	// Now wait for handleNQuads to finish.
-	wg.Wait()
-
-	// We're doing things in this complicated way, because generating
-	// string -> NQuad is the slower task here.
-	ticker.Stop()
-	return atomic.LoadUint64(&ctr.processed), nil
-}
-
-func rdfHandler(w http.ResponseWriter, r *http.Request) {
-	if r.Method != "POST" {
-		x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
-		return
-	}
-
-	defer r.Body.Close()
-	count, err := handleRdfReader(r.Body)
-	if err != nil {
-		x.SetStatus(w, x.E_ERROR, err.Error())
-		return
-	}
-	glog.WithField("count", count).Debug("RDFs parsed")
-	x.SetStatus(w, x.E_OK, fmt.Sprintf("%d RDFs parsed", count))
-}
 
 func queryHandler(w http.ResponseWriter, r *http.Request) {
 	if r.Method != "POST" {
@@ -230,43 +88,17 @@ func main() {
 
 	ps := new(store.Store)
 	ps.Init(*postingDir)
+	defer ps.Close()
+
 	clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
-	clog.SkipWrite = true // Don't write to commit logs.
+	clog.SkipWrite = false
+	clog.SyncEvery = 1
 	clog.Init()
 	defer clog.Close()
-	posting.Init(ps, clog)
 
-	if len(*rdfGzips) > 0 {
-		files := strings.Split(*rdfGzips, ",")
-		for _, path := range files {
-			if len(path) == 0 {
-				continue
-			}
-			glog.WithField("path", path).Info("Handling...")
-			f, err := os.Open(path)
-			if err != nil {
-				glog.WithError(err).Fatal("Unable to open rdf file.")
-			}
-
-			r, err := gzip.NewReader(f)
-			if err != nil {
-				glog.WithError(err).Fatal("Unable to create gzip reader.")
-			}
-
-			count, err := handleRdfReader(r)
-			if err != nil {
-				glog.Fatal(err)
-			}
-			glog.WithField("count", count).Info("RDFs parsed")
-			r.Close()
-			f.Close()
-		}
-	}
-	// Okay, now start accumulating mutations.
-	clog.SkipWrite = false
-	clog.SyncEvery = 1
+	posting.Init(ps, clog)
+	posting.StartPeriodicMerging()
 
-	http.HandleFunc("/rdf", rdfHandler)
 	http.HandleFunc("/query", queryHandler)
 	glog.WithField("port", *port).Info("Listening for requests...")
 	if err := http.ListenAndServe(":"+*port, nil); err != nil {
diff --git a/server/main_test.go b/server/main_test.go
index 725dc40c..d1038d6c 100644
--- a/server/main_test.go
+++ b/server/main_test.go
@@ -24,6 +24,7 @@ import (
 
 	"github.com/dgraph-io/dgraph/commit"
 	"github.com/dgraph-io/dgraph/gql"
+	"github.com/dgraph-io/dgraph/loader"
 	"github.com/dgraph-io/dgraph/posting"
 	"github.com/dgraph-io/dgraph/query"
 	"github.com/dgraph-io/dgraph/store"
@@ -65,7 +66,7 @@ func prepare() (dir1, dir2 string, clog *commit.Logger, rerr error) {
 		return dir1, dir2, clog, err
 	}
 	defer f.Close()
-	_, err = handleRdfReader(f)
+	_, err = loader.HandleRdfReader(f, 1)
 	if err != nil {
 		return dir1, dir2, clog, err
 	}
-- 
GitLab