From ca369823b241dc5c7fc7014b9a320858ed4da504 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Fri, 20 Nov 2015 15:53:50 +1100
Subject: [PATCH] Loaded up 21M edges, all of film data + names. And ran a
 complex query, with a 60ms latency!

---
 commit/log.go         | 12 ++++++++++--
 notes.txt             |  1 +
 posting/list.go       | 25 +++++++++++++++++++++++++
 posting/lists.go      |  5 ++++-
 server/loader/main.go |  7 +++++--
 server/main.go        | 25 +++++++++++++++++++++----
 server/notes.txt      | 29 +++++++++++++++++++++++++++++
 7 files changed, 95 insertions(+), 9 deletions(-)
 create mode 100644 notes.txt
 create mode 100644 server/notes.txt

diff --git a/commit/log.go b/commit/log.go
index fc478310..46bbc1f4 100644
--- a/commit/log.go
+++ b/commit/log.go
@@ -243,6 +243,7 @@ func lastTimestamp(path string) (int64, error) {
 		return 0, err
 	}
 
+	discard := make([]byte, 4096)
 	reader := bufio.NewReaderSize(f, 2<<20)
 	var maxTs int64
 	header := make([]byte, 16)
@@ -277,7 +278,10 @@ func lastTimestamp(path string) (int64, error) {
 			}).Fatal("Log file doesn't have monotonically increasing records.")
 		}
 
-		reader.Discard(int(h.size))
+		for int(h.size) > len(discard) {
+			discard = make([]byte, len(discard)*2)
+		}
+		reader.Read(discard[:int(h.size)])
 	}
 	return maxTs, nil
 }
@@ -396,6 +400,7 @@ func streamEntriesInFile(path string,
 	}
 	defer f.Close()
 
+	discard := make([]byte, 4096)
 	reader := bufio.NewReaderSize(f, 5<<20)
 	header := make([]byte, 16)
 	for {
@@ -429,7 +434,10 @@ func streamEntriesInFile(path string,
 			ch <- data
 
 		} else {
-			reader.Discard(int(hdr.size))
+			for int(hdr.size) > len(discard) {
+				discard = make([]byte, len(discard)*2)
+			}
+			reader.Read(discard[:int(hdr.size)])
 		}
 	}
 	return nil
diff --git a/notes.txt b/notes.txt
new file mode 100644
index 00000000..cf94b512
--- /dev/null
+++ b/notes.txt
@@ -0,0 +1 @@
+Have to increase the file limit set in Ubuntu to avoid the "too many open files" issue, probably caused by goroutines opening up RocksDB SST files.
diff --git a/posting/list.go b/posting/list.go
index bbca733f..af53ede6 100644
--- a/posting/list.go
+++ b/posting/list.go
@@ -85,6 +85,22 @@ func (pa ByUid) Len() int           { return len(pa) }
 func (pa ByUid) Swap(i, j int)      { pa[i], pa[j] = pa[j], pa[i] }
 func (pa ByUid) Less(i, j int) bool { return pa[i].Uid() < pa[j].Uid() }
 
+func samePosting(a *types.Posting, b *types.Posting) bool {
+	if a.Uid() != b.Uid() {
+		return false
+	}
+	if a.ValueLength() != b.ValueLength() {
+		return false
+	}
+	if !bytes.Equal(a.ValueBytes(), b.ValueBytes()) {
+		return false
+	}
+	if !bytes.Equal(a.Source(), b.Source()) {
+		return false
+	}
+	return true
+}
+
 // key = (entity uid, attribute)
 func Key(uid uint64, attr string) []byte {
 	buf := new(bytes.Buffer)
@@ -455,6 +471,15 @@ func (l *List) mergeMutation(mp *types.Posting) {
 
 		} else { // curUid not found in mindex.
 			if inPlist { // In plist, so just set it in mlayer.
+				// If this posting matches what we already have in posting list,
+				// we don't need to `dirty` this by adding to mlayer.
+				plist := l.getPostingList()
+				var cp types.Posting
+				if ok := plist.Postings(&cp, pi); ok {
+					if samePosting(&cp, mp) {
+						return // do nothing.
+					}
+				}
 				l.mlayer[pi] = *mp
 
 			} else { // not in plist, not in mindex, so insert in mindex.
diff --git a/posting/lists.go b/posting/lists.go
index 0edf3820..58e1e431 100644
--- a/posting/lists.go
+++ b/posting/lists.go
@@ -19,6 +19,7 @@ package posting
 import (
 	"math/rand"
 	"runtime"
+	"runtime/debug"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -57,7 +58,7 @@ var MIB uint64
 
 func checkMemoryUsage() {
 	MIB = 1 << 20
-	MAX_MEMORY = 2 * (1 << 30)
+	MAX_MEMORY = 3 * (1 << 30)
 
 	for _ = range time.Tick(5 * time.Second) {
 		var ms runtime.MemStats
@@ -78,6 +79,8 @@ func checkMemoryUsage() {
 
 		glog.Info("Merged lists. Calling GC.")
 		runtime.GC() // Call GC to do some cleanup.
+		glog.Info("Trying to free OS memory")
+		debug.FreeOSMemory()
 
 		runtime.ReadMemStats(&ms)
 		megs = ms.Alloc / MIB
diff --git a/server/loader/main.go b/server/loader/main.go
index 70f662de..7cb23237 100644
--- a/server/loader/main.go
+++ b/server/loader/main.go
@@ -58,8 +58,11 @@ func main() {
 	}
 
 	logrus.SetLevel(logrus.InfoLevel)
-	numCpus := runtime.GOMAXPROCS(-1)
-	glog.WithField("gomaxprocs", numCpus).Info("Number of CPUs")
+	numCpus := runtime.NumCPU()
+	prevProcs := runtime.GOMAXPROCS(numCpus)
+	glog.WithField("num_cpu", numCpus).
+		WithField("prev_maxprocs", prevProcs).
+		Info("Set max procs to num cpus")
 
 	if len(*rdfGzips) == 0 {
 		glog.Fatal("No RDF GZIP files specified")
diff --git a/server/main.go b/server/main.go
index 35d94a16..ec9d02a3 100644
--- a/server/main.go
+++ b/server/main.go
@@ -22,6 +22,7 @@ import (
 	"io/ioutil"
 	"net/http"
 	"runtime"
+	"time"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/dgraph-io/dgraph/commit"
@@ -51,14 +52,16 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
 		x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
 		return
 	}
-	glog.WithField("q", string(q)).Info("Query received.")
+	glog.WithField("q", string(q)).Debug("Query received.")
+	now := time.Now()
 	sg, err := gql.Parse(string(q))
 	if err != nil {
 		x.Err(glog, err).Error("While parsing query")
 		x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
 		return
 	}
-	glog.WithField("q", string(q)).Info("Query parsed.")
+	parseLat := time.Since(now)
+	glog.WithField("q", string(q)).Debug("Query parsed.")
 	rch := make(chan error)
 	go query.ProcessGraph(sg, rch)
 	err = <-rch
@@ -67,13 +70,23 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
 		x.SetStatus(w, x.E_ERROR, err.Error())
 		return
 	}
-	glog.WithField("q", string(q)).Info("Graph processed.")
+	processLat := time.Since(now)
+	glog.WithField("q", string(q)).Debug("Graph processed.")
 	js, err := sg.ToJson()
 	if err != nil {
 		x.Err(glog, err).Error("While converting to Json.")
 		x.SetStatus(w, x.E_ERROR, err.Error())
 		return
 	}
+	jsonLat := time.Since(now)
+	glog.WithFields(logrus.Fields{
+		"total":   time.Since(now),
+		"parsing": parseLat.String(),
+		"process": processLat - parseLat,
+		"json":    jsonLat - processLat,
+	}).Info("Query Latencies")
+
+	glog.WithField("latency", time.Since(now).String()).Info("Query Latency")
 	w.Header().Set("Content-Type", "application/json")
 	fmt.Fprint(w, string(js))
 }
@@ -84,7 +97,11 @@ func main() {
 		glog.Fatal("Unable to parse flags")
 	}
 	logrus.SetLevel(logrus.InfoLevel)
-	glog.WithField("gomaxprocs", runtime.GOMAXPROCS(-1)).Info("Number of CPUs")
+	numCpus := runtime.NumCPU()
+	prev := runtime.GOMAXPROCS(numCpus)
+	glog.WithField("num_cpu", numCpus).
+		WithField("prev_maxprocs", prev).
+		Info("Set max procs to num cpus")
 
 	ps := new(store.Store)
 	ps.Init(*postingDir)
diff --git a/server/notes.txt b/server/notes.txt
new file mode 100644
index 00000000..51be067b
--- /dev/null
+++ b/server/notes.txt
@@ -0,0 +1,29 @@
+curl localhost:8080/query -XPOST -d '{
+me(_xid_: m.06pj8) {
+        type.object.name.en
+        film.director.film {
+                type.object.name.en
+                film.film.starring {
+                        film.performance.actor {
+                                film.director.film {
+                                        type.object.name.en
+                                }
+                                type.object.name.en
+                        }
+                }
+                film.film.initial_release_date
+                film.film.country
+                film.film.genre {
+                        type.object.name.en
+                }
+        }
+}
+}' > output.json
+
+INFO[0554] Query Latencies                               json=35.434904ms package=server parsing=93.385µs process=24.785928ms total=60.314523ms
+
+The first time this query runs, it's a bit slower, taking around 250ms in processing, mostly because we have to load
+the posting lists from rocksdb, and use disk seeks. But, on consecutive runs, when the PLs are already in memory,
+this takes under 25ms for processing. IT actually takes more time to convert the results to JSON!
+
+My reactions are mixed. I'm happy, but unsatisfied.
-- 
GitLab