From 68ea2d1581a2e533ba000e6fe9c4b88fce6c2ad5 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Tue, 1 Mar 2016 18:00:29 +1100
Subject: [PATCH] Add memory profiling. Use a sync.Pool for lexer. Add
 benchmark numbers before and after using sync.Pool. Simplify assignUids only.

---
 lex/lexer.go                  | 14 ++++++---
 loader/loader.go              | 34 +++++++++++++---------
 posting/list.go               |  3 +-
 rdf/README.txt                | 53 +++++++++++++++++++++++++++++++++++
 rdf/parse.go                  |  2 ++
 server/uidassigner/README.txt | 17 +++++++++++
 server/uidassigner/main.go    | 10 +++++++
 7 files changed, 114 insertions(+), 19 deletions(-)
 create mode 100644 rdf/README.txt
 create mode 100644 server/uidassigner/README.txt

diff --git a/lex/lexer.go b/lex/lexer.go
index 9027e110..00cebc2d 100644
--- a/lex/lexer.go
+++ b/lex/lexer.go
@@ -18,12 +18,18 @@ package lex
 
 import (
 	"fmt"
+	"sync"
 	"unicode/utf8"
 
 	"github.com/dgraph-io/dgraph/x"
 )
 
 var glog = x.Log("lexer")
+var LexerPool = sync.Pool{
+	New: func() interface{} {
+		return &Lexer{}
+	},
+}
 
 const EOF = -1
 
@@ -67,10 +73,10 @@ type Lexer struct {
 }
 
 func NewLexer(input string) *Lexer {
-	l := &Lexer{
-		Input: input,
-		Items: make(chan item, 100),
-	}
+	l := LexerPool.Get().(*Lexer)
+	*l = Lexer{}
+	l.Input = input
+	l.Items = make(chan item, 5)
 	return l
 }
 
diff --git a/loader/loader.go b/loader/loader.go
index f9beb446..aa46cd22 100644
--- a/loader/loader.go
+++ b/loader/loader.go
@@ -158,7 +158,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
 	wg.Done()
 }
 
-func (s *state) getUidForString(str string) {
+func (s *state) getUidForString(str string) error {
 	_, err := rdf.GetUid(str, s.instanceIdx, s.numInstances)
 	for err != nil {
 		// Just put in a retry loop to tackle temporary errors.
@@ -169,31 +169,39 @@ func (s *state) getUidForString(str string) {
 		} else {
 			glog.WithError(err).WithField("nq.Subject", str).
 				Error("While getting UID")
-			return
+			return err
 		}
 		_, err = rdf.GetUid(str, s.instanceIdx, s.numInstances)
 	}
+	return nil
 }
 
 func (s *state) assignUidsOnly(wg *sync.WaitGroup) {
+	defer wg.Done()
+
 	for nq := range s.cnq {
-		if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances != s.instanceIdx {
-			// This instance shouldnt assign UID to this string
-			atomic.AddUint64(&s.ctr.ignored, 1)
-		} else {
-			s.getUidForString(nq.Subject)
+		ignored := true
+		if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances == s.instanceIdx {
+			if err := s.getUidForString(nq.Subject); err != nil {
+				glog.WithError(err).Fatal("While assigning Uid to subject.")
+			}
+			ignored = false
 		}
 
-		if len(nq.ObjectId) == 0 ||
-			farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances != s.instanceIdx {
-			// This instance shouldnt or cant assign UID to this string
+		if len(nq.ObjectId) > 0 &&
+			farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances == s.instanceIdx {
+			if err := s.getUidForString(nq.ObjectId); err != nil {
+				glog.WithError(err).Fatal("While assigning Uid to object.")
+			}
+			ignored = false
+		}
+
+		if ignored {
 			atomic.AddUint64(&s.ctr.ignored, 1)
 		} else {
-			s.getUidForString(nq.ObjectId)
+			atomic.AddUint64(&s.ctr.processed, 1)
 		}
 	}
-
-	wg.Done()
 }
 
 // Blocking function.
diff --git a/posting/list.go b/posting/list.go
index 31c859b2..cef683f1 100644
--- a/posting/list.go
+++ b/posting/list.go
@@ -39,12 +39,11 @@ import (
 )
 
 var glog = x.Log("posting")
+var E_TMP_ERROR = fmt.Errorf("Temporary Error. Please retry.")
 
 const Set = 0x01
 const Del = 0x02
 
-var E_TMP_ERROR = errors.New("Temporary Error. Please retry.")
-
 type buffer struct {
 	d []byte
 }
diff --git a/rdf/README.txt b/rdf/README.txt
new file mode 100644
index 00000000..deff864d
--- /dev/null
+++ b/rdf/README.txt
@@ -0,0 +1,53 @@
+go tool pprof --alloc_objects uidassigner heap.prof
+
+(pprof) top10
+196427053 of 207887723 total (94.49%)
+Dropped 41 nodes (cum <= 1039438)
+Showing top 10 nodes out of 31 (cum >= 8566234)
+      flat  flat%   sum%        cum   cum%
+  55529704 26.71% 26.71%   55529704 26.71%  github.com/dgraph-io/dgraph/rdf.Parse
+  28255068 13.59% 40.30%   30647245 14.74%  github.com/dgraph-io/dgraph/posting.(*List).getPostingList
+  20406729  9.82% 50.12%   20406729  9.82%  github.com/zond/gotomic.newRealEntryWithHashCode
+  17777182  8.55% 58.67%   17777182  8.55%  strings.makeCutsetFunc
+  17582839  8.46% 67.13%   17706815  8.52%  github.com/dgraph-io/dgraph/loader.(*state).readLines
+  15139047  7.28% 74.41%   88445933 42.55%  github.com/dgraph-io/dgraph/loader.(*state).parseStream
+  12927366  6.22% 80.63%   12927366  6.22%  github.com/zond/gotomic.(*element).search
+  10789028  5.19% 85.82%   66411362 31.95%  github.com/dgraph-io/dgraph/posting.GetOrCreate
+   9453856  4.55% 90.37%    9453856  4.55%  github.com/zond/gotomic.(*hashHit).search
+   8566234  4.12% 94.49%    8566234  4.12%  github.com/dgraph-io/dgraph/uid.stringKey
+
+
+(pprof) list rdf.Parse
+Total: 207887723
+ROUTINE ======================== github.com/dgraph-io/dgraph/rdf.Parse in /home/mrjn/go/src/github.com/dgraph-io/dgraph/rdf/parse.go
+  55529704   55529704 (flat, cum) 26.71% of Total
+         .          .    118:	}
+         .          .    119:	return val[1 : len(val)-1]
+         .          .    120:}
+         .          .    121:
+         .          .    122:func Parse(line string) (rnq NQuad, rerr error) {
+  54857942   54857942    123:	l := lex.NewLexer(line)
+         .          .    124:	go run(l)
+         .          .    125:	var oval string
+         .          .    126:	var vend bool
+
+
+This showed that lex.NewLexer(..) was pretty expensive in terms of memory allocation.
+So, let's use sync.Pool here.
+
+After using sync.Pool, this is the output:
+
+422808936 of 560381333 total (75.45%)
+Dropped 63 nodes (cum <= 2801906)
+Showing top 10 nodes out of 62 (cum >= 18180150)
+      flat  flat%   sum%        cum   cum%
+ 103445194 18.46% 18.46%  103445194 18.46%  github.com/Sirupsen/logrus.(*Entry).WithFields
+  65448918 11.68% 30.14%  163184489 29.12%  github.com/Sirupsen/logrus.(*Entry).WithField
+  48366300  8.63% 38.77%  203838187 36.37%  github.com/dgraph-io/dgraph/posting.(*List).get
+  39789719  7.10% 45.87%   49276181  8.79%  github.com/dgraph-io/dgraph/posting.(*List).getPostingList
+  36642638  6.54% 52.41%   36642638  6.54%  github.com/dgraph-io/dgraph/lex.NewLexer
+  35190301  6.28% 58.69%   35190301  6.28%  github.com/google/flatbuffers/go.(*Builder).growByteBuffer
+  31392455  5.60% 64.29%   31392455  5.60%  github.com/zond/gotomic.newRealEntryWithHashCode
+  25895676  4.62% 68.91%   25895676  4.62%  github.com/zond/gotomic.(*element).search
+  18546971  3.31% 72.22%   72863016 13.00%  github.com/dgraph-io/dgraph/loader.(*state).parseStream
+  18090764  3.23% 75.45%   18180150  3.24%  github.com/dgraph-io/dgraph/loader.(*state).readLines
diff --git a/rdf/parse.go b/rdf/parse.go
index 0a8c8f6a..2dfaf002 100644
--- a/rdf/parse.go
+++ b/rdf/parse.go
@@ -121,6 +121,8 @@ func stripBracketsIfPresent(val string) string {
 
 func Parse(line string) (rnq NQuad, rerr error) {
 	l := lex.NewLexer(line)
+	defer lex.LexerPool.Put(l)
+
 	go run(l)
 	var oval string
 	var vend bool
diff --git a/server/uidassigner/README.txt b/server/uidassigner/README.txt
new file mode 100644
index 00000000..30769517
--- /dev/null
+++ b/server/uidassigner/README.txt
@@ -0,0 +1,17 @@
+(pprof) top10
+4.19mins of 7.46mins total (56.23%)
+Dropped 569 nodes (cum <= 0.75mins)
+Showing top 10 nodes out of 25 (cum >= 2.78mins)
+      flat  flat%   sum%        cum   cum%
+  1.71mins 22.86% 22.86%   4.03mins 54.02%  runtime.scanobject
+  1.16mins 15.62% 38.48%   1.16mins 15.62%  runtime.heapBitsForObject
+  0.95mins 12.77% 51.25%   1.21mins 16.16%  runtime.greyobject
+  0.19mins  2.56% 53.81%   3.46mins 46.41%  runtime.mallocgc
+  0.07mins  0.88% 54.69%   1.08mins 14.53%  runtime.mapassign1
+  0.04mins  0.48% 55.17%   3.82mins 51.17%  runtime.systemstack
+  0.03mins  0.35% 55.52%   0.92mins 12.36%  runtime.gcDrain
+  0.02mins  0.26% 55.78%   1.42mins 18.98%  github.com/dgraph-io/dgraph/rdf.Parse
+  0.02mins  0.23% 56.01%   0.92mins 12.29%  runtime.newobject
+  0.02mins  0.22% 56.23%   2.78mins 37.25%  runtime.gcDrainN
+
+The above CPU profile was generated before using any `sync.Pool`
diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go
index 2e1546da..0a003824 100644
--- a/server/uidassigner/main.go
+++ b/server/uidassigner/main.go
@@ -27,6 +27,7 @@ var numInstances = flag.Uint64("numInstances", 1,
 var uidDir = flag.String("uidpostings", "",
 	"Directory to store xid to uid posting lists")
 var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
+var memprofile = flag.String("memprofile", "", "write memory profile to file")
 var numcpu = flag.Int("numCpu", runtime.NumCPU(),
 	"Number of cores to be used by the process")
 
@@ -93,4 +94,13 @@ func main() {
 	}
 	glog.Info("Calling merge lists")
 	posting.MergeLists(100 * numCpus) // 100 per core.
+
+	if len(*memprofile) > 0 {
+		f, err := os.Create(*memprofile)
+		if err != nil {
+			glog.Fatal(err)
+		}
+		pprof.WriteHeapProfile(f)
+		f.Close()
+	}
 }
-- 
GitLab