From 06701f5f89b6c381cc69818158dbd7ee0a85cd12 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Mon, 23 Nov 2015 17:54:07 +1100
Subject: [PATCH] Use cache for streaming entries instead of opening files for
 each posting list. Use iterator instead of channel while streaming entries.

---
 commit/cache.go       |  18 ++++++-
 commit/log.go         | 118 +++++++++++++++++++++++++++---------------
 commit/log_test.go    |  22 +++-----
 gql/parser_test.go    |   3 +-
 posting/list.go       |  10 ++--
 query/query_test.go   |  20 +++----
 server/loader/main.go |   2 +-
 server/main.go        |   2 +-
 8 files changed, 117 insertions(+), 78 deletions(-)

diff --git a/commit/cache.go b/commit/cache.go
index 2a05d5d4..6fdbe746 100644
--- a/commit/cache.go
+++ b/commit/cache.go
@@ -21,16 +21,22 @@ import (
 	"io"
 	"io/ioutil"
 	"sync"
+	"sync/atomic"
+	"time"
 )
 
+var E_READ = errors.New("Unable to read")
 var E_WRITE = errors.New("Unable to write")
 
 type Cache struct {
 	sync.RWMutex
-	buf []byte
+	buf        []byte
+	lastAccess int64
 }
 
 func (c *Cache) Write(p []byte) (n int, err error) {
+	atomic.StoreInt64(&c.lastAccess, time.Now().UnixNano())
+
 	c.Lock()
 	defer c.Unlock()
 	c.buf = append(c.buf, p...)
@@ -38,6 +44,8 @@ func (c *Cache) Write(p []byte) (n int, err error) {
 }
 
 func (c *Cache) ReadAt(pos int, p []byte) (n int, err error) {
+	atomic.StoreInt64(&c.lastAccess, time.Now().UnixNano())
+
 	c.RLock()
 	defer c.RUnlock()
 
@@ -46,9 +54,17 @@ func (c *Cache) ReadAt(pos int, p []byte) (n int, err error) {
 	}
 
 	n = copy(p, c.buf[pos:])
+	if n < len(p) {
+		return n, E_READ
+	}
 	return n, nil
 }
 
+func (c *Cache) LastAccessedInSeconds() int64 {
+	d := atomic.LoadInt64(&c.lastAccess)
+	return (time.Now().UnixNano() - d) / 1000000000
+}
+
 // Reader isn't thread-safe. But multiple readers can be used to read the
 // same cache.
 type Reader struct {
diff --git a/commit/log.go b/commit/log.go
index ba1b75d7..30583308 100644
--- a/commit/log.go
+++ b/commit/log.go
@@ -23,13 +23,13 @@
 package commit
 
 import (
-	"bufio"
 	"bytes"
 	"encoding/binary"
 	"fmt"
 	"io"
 	"os"
 	"path/filepath"
+	"runtime/debug"
 	"sort"
 	"strconv"
 	"strings"
@@ -52,6 +52,25 @@ type logFile struct {
 	cache *Cache
 }
 
+func (lf *logFile) Cache() *Cache {
+	lf.RLock()
+	defer lf.RUnlock()
+	return lf.cache
+}
+
+func (lf *logFile) FillIfEmpty(wg *sync.WaitGroup) {
+	lf.Lock()
+	defer lf.Unlock()
+	defer wg.Done()
+	if lf.cache != nil {
+		return
+	}
+	lf.cache = new(Cache)
+	if err := FillCache(lf.cache, lf.path); err != nil {
+		glog.WithError(err).WithField("path", lf.path).Fatal("Unable to fill cache.")
+	}
+}
+
 type CurFile struct {
 	sync.RWMutex
 	f         *os.File
@@ -61,6 +80,11 @@ type CurFile struct {
 }
 
 func (c *CurFile) cache() *Cache {
+	if c == nil {
+		debug.PrintStack()
+		// This got triggered due to a premature cleanup in query_test.go
+	}
+
 	v := atomic.LoadPointer(&c.cch)
 	if v == nil {
 		return nil
@@ -137,6 +161,19 @@ func (l *Logger) updateLastLogTs(val int64) {
 	}
 }
 
+func (l *Logger) DeleteCacheOlderThan(v time.Duration) {
+	l.RLock()
+	defer l.RUnlock()
+	s := int64(v.Seconds())
+	for _, lf := range l.list {
+		if lf.Cache().LastAccessedInSeconds() > s {
+			lf.Lock()
+			lf.cache = nil
+			lf.Unlock()
+		}
+	}
+}
+
 func (l *Logger) periodicSync() {
 	glog.WithField("dur", l.SyncDur).Debug("Periodic sync.")
 	if l.SyncDur == 0 {
@@ -447,87 +484,82 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
 	return nil
 }
 
-func streamEntriesInFile(path string,
-	afterTs int64, hash uint32, ch chan []byte) error {
-
-	flog := glog.WithField("path", path)
-	f, err := os.Open(path)
-	if err != nil {
-		flog.WithError(err).Error("While opening file.")
-		return err
-	}
-	defer f.Close()
+func streamEntries(cache *Cache,
+	afterTs int64, hash uint32, iter LogIterator) error {
 
-	discard := make([]byte, 4096)
-	reader := bufio.NewReaderSize(f, 5<<20)
+	flog := glog
+	reader := NewReader(cache)
 	header := make([]byte, 16)
 	for {
-		n, err := reader.Read(header)
+		_, err := reader.Read(header)
 		if err == io.EOF {
-			flog.Debug("File read complete.")
+			flog.Debug("Cache read complete.")
 			break
 		}
-		if n != len(header) {
-			flog.WithField("n", n).Fatal("Unable to read header.")
-		}
 		if err != nil {
-			flog.WithError(err).Error("While reading header.")
+			flog.WithError(err).Fatal("While reading header.")
 			return err
 		}
+
 		hdr, err := parseHeader(header)
 		if err != nil {
 			flog.WithError(err).Error("While parsing header.")
 			return err
 		}
+
 		if hdr.hash == hash && hdr.ts >= afterTs {
 			data := make([]byte, hdr.size)
-			n, err := reader.Read(data)
+			_, err := reader.Read(data)
 			if err != nil {
-				flog.WithError(err).Error("While reading data.")
+				flog.WithError(err).Fatal("While reading data.")
 				return err
 			}
-			if int32(n) != hdr.size {
-				flog.WithField("n", n).Fatal("Unable to read data.")
-			}
-			ch <- data
+			iter(data)
 
 		} else {
-			for int(hdr.size) > len(discard) {
-				discard = make([]byte, len(discard)*2)
-			}
-			reader.Read(discard[:int(hdr.size)])
+			reader.Discard(int(hdr.size))
 		}
 	}
 	return nil
 }
 
+type LogIterator func(record []byte)
+
 // Always run this method in it's own goroutine. Otherwise, your program
 // will just hang waiting on channels.
 func (l *Logger) StreamEntries(afterTs int64, hash uint32,
-	ch chan []byte, done chan error) {
+	iter LogIterator) error {
 
-	var paths []string
+	var wg sync.WaitGroup
 	l.RLock()
 	for _, lf := range l.list {
 		if afterTs < lf.endTs {
-			paths = append(paths, lf.path)
+			wg.Add(1)
+			go lf.FillIfEmpty(&wg)
 		}
 	}
 	l.RUnlock()
+	wg.Wait()
 
-	{
-		cur := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
-		if _, err := os.Stat(cur); err == nil {
-			paths = append(paths, cur)
+	l.RLock()
+	var caches []*Cache
+	for _, lf := range l.list {
+		if afterTs < lf.endTs {
+			caches = append(caches, lf.Cache())
 		}
 	}
-	for _, path := range paths {
-		if err := streamEntriesInFile(path, afterTs, hash, ch); err != nil {
-			close(ch)
-			done <- err
-			return
+	l.RUnlock()
+
+	// Add current cache.
+	caches = append(caches, l.curFile().cache())
+	for _, cache := range caches {
+		if cache == nil {
+			glog.Error("Cache is nil")
+			continue
+		}
+		if err := streamEntries(cache, afterTs, hash, iter); err != nil {
+			return err
 		}
 	}
-	close(ch)
-	done <- nil
+	return nil
 }
diff --git a/commit/log_test.go b/commit/log_test.go
index ad2e1e52..c0e2ab6b 100644
--- a/commit/log_test.go
+++ b/commit/log_test.go
@@ -211,17 +211,14 @@ func TestReadEntries(t *testing.T) {
 
 	{
 		// Check for hash = 1, ts >= 2.
-		ch := make(chan []byte, 10)
-		done := make(chan error)
-		go l.StreamEntries(ts+2, uint32(1), ch, done)
 		count := 0
-		for val := range ch {
+		err := l.StreamEntries(ts+2, uint32(1), func(entry []byte) {
 			count += 1
-			if bytes.Compare(data, val) != 0 {
+			if bytes.Compare(data, entry) != 0 {
 				t.Error("Data doesn't equate.")
 			}
-		}
-		if err := <-done; err != nil {
+		})
+		if err != nil {
 			t.Error(err)
 		}
 		if count != 2 {
@@ -233,17 +230,14 @@ func TestReadEntries(t *testing.T) {
 			t.Error(err)
 		}
 		// Check for hash = 1, ts >= 2.
-		ch := make(chan []byte, 10)
-		done := make(chan error)
-		go l.StreamEntries(ts, uint32(1), ch, done)
 		count := 0
-		for val := range ch {
+		err := l.StreamEntries(ts, uint32(1), func(entry []byte) {
 			count += 1
-			if bytes.Compare(data, val) != 0 {
+			if bytes.Compare(data, entry) != 0 {
 				t.Error("Data doesn't equate.")
 			}
-		}
-		if err := <-done; err != nil {
+		})
+		if err != nil {
 			t.Error(err)
 		}
 		if count != 4 {
diff --git a/gql/parser_test.go b/gql/parser_test.go
index ab17f0a2..b9cf369d 100644
--- a/gql/parser_test.go
+++ b/gql/parser_test.go
@@ -20,7 +20,6 @@ import (
 	"fmt"
 	"testing"
 
-	"github.com/Sirupsen/logrus"
 	"github.com/dgraph-io/dgraph/query"
 )
 
@@ -77,7 +76,7 @@ func TestParse(t *testing.T) {
 }
 
 func TestParseXid(t *testing.T) {
-	logrus.SetLevel(logrus.DebugLevel)
+	// logrus.SetLevel(logrus.DebugLevel)
 	query := `
 	query {
 		user(_uid_: 0x11) {
diff --git a/posting/list.go b/posting/list.go
index af53ede6..a1b6a29a 100644
--- a/posting/list.go
+++ b/posting/list.go
@@ -242,12 +242,8 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
 	l.hash = farm.Fingerprint32(key)
 	l.mlayer = make(map[int]types.Posting)
 
-	ch := make(chan []byte, 100)
-	done := make(chan error)
 	glog.Debug("Starting stream entries...")
-	go clog.StreamEntries(posting.CommitTs()+1, l.hash, ch, done)
-
-	for buffer := range ch {
+	err := clog.StreamEntries(posting.CommitTs()+1, l.hash, func(buffer []byte) {
 		uo := flatbuffers.GetUOffsetT(buffer)
 		m := new(types.Posting)
 		m.Init(buffer, uo)
@@ -260,8 +256,8 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
 			"ts":     m.Ts(),
 		}).Debug("Got entry from log")
 		l.mergeMutation(m)
-	}
-	if err := <-done; err != nil {
+	})
+	if err != nil {
 		glog.WithError(err).Error("While streaming entries.")
 	}
 	glog.Debug("Done streaming entries.")
diff --git a/query/query_test.go b/query/query_test.go
index e60c3084..549127bb 100644
--- a/query/query_test.go
+++ b/query/query_test.go
@@ -23,7 +23,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/Sirupsen/logrus"
 	"github.com/dgraph-io/dgraph/commit"
 	"github.com/dgraph-io/dgraph/posting"
 	"github.com/dgraph-io/dgraph/store"
@@ -190,21 +189,19 @@ func TestNewGraph(t *testing.T) {
 	}
 }
 
-func populateGraph(t *testing.T) {
-	logrus.SetLevel(logrus.DebugLevel)
+func populateGraph(t *testing.T) string {
+	// logrus.SetLevel(logrus.DebugLevel)
 	dir, err := ioutil.TempDir("", "storetest_")
 	if err != nil {
 		t.Error(err)
-		return
+		return ""
 	}
 
-	defer os.RemoveAll(dir)
 	ps := new(store.Store)
 	ps.Init(dir)
 
 	clog := commit.NewLogger(dir, "mutations", 50<<20)
 	clog.Init()
-	defer clog.Close()
 	posting.Init(ps, clog)
 
 	// So, user we're interested in has uid: 1.
@@ -250,10 +247,13 @@ func populateGraph(t *testing.T) {
 
 	edge.Value = "Andrea"
 	addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name")))
+
+	return dir
 }
 
 func TestProcessGraph(t *testing.T) {
-	populateGraph(t)
+	dir := populateGraph(t)
+	defer os.RemoveAll(dir)
 
 	// Alright. Now we have everything set up. Let's create the query.
 	sg, err := NewGraph(1, "")
@@ -346,7 +346,8 @@ func TestProcessGraph(t *testing.T) {
 }
 
 func TestToJson(t *testing.T) {
-	populateGraph(t)
+	dir := populateGraph(t)
+	defer os.RemoveAll(dir)
 
 	// Alright. Now we have everything set up. Let's create the query.
 	sg, err := NewGraph(1, "")
@@ -379,7 +380,8 @@ func TestToJson(t *testing.T) {
 		t.Error(err)
 	}
 
-	js, err := sg.ToJson()
+	var l Latency
+	js, err := sg.ToJson(&l)
 	if err != nil {
 		t.Error(err)
 	}
diff --git a/server/loader/main.go b/server/loader/main.go
index 7cb23237..2390f484 100644
--- a/server/loader/main.go
+++ b/server/loader/main.go
@@ -72,7 +72,7 @@ func main() {
 	defer ps.Close()
 
 	clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
-	clog.SkipWrite = true // Don't write to commit logs.
+	clog.SetSkipWrite(true) // Don't write to commit logs.
 	clog.Init()
 	defer clog.Close()
 	posting.Init(ps, clog)
diff --git a/server/main.go b/server/main.go
index e502c161..1fc3b65d 100644
--- a/server/main.go
+++ b/server/main.go
@@ -107,7 +107,7 @@ func main() {
 	defer ps.Close()
 
 	clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
-	clog.SkipWrite = false
+	clog.SetSkipWrite(false)
 	clog.SyncEvery = 1
 	clog.Init()
 	defer clog.Close()
-- 
GitLab