From 2b6788b919f3f9db7f012923cdb9a82f48b00a06 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Mon, 23 Nov 2015 16:48:26 +1100
Subject: [PATCH] Move curfile in it's own struct, and avoid holding global
 lock in logger as much as possible.

---
 commit/log.go      | 170 +++++++++++++++++++++++++++------------------
 commit/log_test.go |  27 +++----
 2 files changed, 116 insertions(+), 81 deletions(-)

diff --git a/commit/log.go b/commit/log.go
index 7141a583..ba1b75d7 100644
--- a/commit/log.go
+++ b/commit/log.go
@@ -36,6 +36,7 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
+	"unsafe"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/dgraph-io/dgraph/x"
@@ -48,6 +49,29 @@ type logFile struct {
 	endTs int64 // never modified after creation.
 	path  string
 	size  int64
+	cache *Cache
+}
+
+type CurFile struct {
+	sync.RWMutex
+	f         *os.File
+	size      int64
+	dirtyLogs int
+	cch       unsafe.Pointer // handled via atomics.
+}
+
+func (c *CurFile) cache() *Cache {
+	v := atomic.LoadPointer(&c.cch)
+	if v == nil {
+		return nil
+	}
+	return (*Cache)(v)
+}
+
+func (c *CurFile) Size() int64 {
+	c.RLock()
+	defer c.RUnlock()
+	return c.size
 }
 
 type ByTimestamp []*logFile
@@ -80,12 +104,16 @@ type Logger struct {
 	skipWrite int32
 
 	sync.RWMutex
-	list              []*logFile
-	curFile           *os.File
-	size              int64
-	lastLogTs         int64
-	logsSinceLastSync int
-	ticker            *time.Ticker
+	list      []*logFile
+	cf        *CurFile
+	lastLogTs int64 // handled via atomics.
+	ticker    *time.Ticker
+}
+
+func (l *Logger) curFile() *CurFile {
+	l.RLock()
+	defer l.RUnlock()
+	return l.cf
 }
 
 func (l *Logger) SetSkipWrite(val bool) {
@@ -109,11 +137,6 @@ func (l *Logger) updateLastLogTs(val int64) {
 	}
 }
 
-func (l *Logger) resetCounters() {
-	l.size = 0
-	l.logsSinceLastSync = 0
-}
-
 func (l *Logger) periodicSync() {
 	glog.WithField("dur", l.SyncDur).Debug("Periodic sync.")
 	if l.SyncDur == 0 {
@@ -123,18 +146,25 @@ func (l *Logger) periodicSync() {
 
 	l.ticker = time.NewTicker(l.SyncDur)
 	for _ = range l.ticker.C {
-		l.Lock()
-		if l.curFile != nil && l.logsSinceLastSync > 0 {
-			if err := l.curFile.Sync(); err != nil {
-				glog.WithError(err).Error("While periodically syncing.")
+		cf := l.curFile()
+		if cf == nil {
+			continue
+		}
+
+		{
+			cf.Lock()
+			if cf.dirtyLogs > 0 {
+				if err := cf.f.Sync(); err != nil {
+					glog.WithError(err).Error("While periodically syncing.")
+				} else {
+					cf.dirtyLogs = 0
+					glog.Debug("Successful periodic sync.")
+				}
 			} else {
-				l.logsSinceLastSync = 0
-				glog.Debug("Successful periodic sync.")
+				glog.Debug("Skipping periodic sync.")
 			}
-		} else {
-			glog.Debug("Skipping periodic sync.")
+			cf.Unlock()
 		}
-		l.Unlock()
 	}
 }
 
@@ -145,11 +175,11 @@ func (l *Logger) Close() {
 	if l.ticker != nil {
 		l.ticker.Stop()
 	}
-	if l.curFile != nil {
-		if err := l.curFile.Close(); err != nil {
+	if l.cf != nil {
+		if err := l.cf.f.Close(); err != nil {
 			glog.WithError(err).Error("While closing current file.")
 		}
-		l.curFile = nil
+		l.cf = nil
 	}
 }
 
@@ -205,16 +235,22 @@ func (l *Logger) Init() {
 		fi, err := os.Stat(path)
 		if err == nil {
 			// we have the file. Derive information for counters.
-			l.size = fi.Size()
-			l.logsSinceLastSync = 0
-			lastTs, err := lastTimestamp(path)
+			l.cf = new(CurFile)
+			l.cf.size = fi.Size()
+			l.cf.dirtyLogs = 0
+			cache := new(Cache)
+			if ferr := FillCache(cache, path); ferr != nil {
+				glog.WithError(ferr).Fatal("Unable to write to cache.")
+			}
+			atomic.StorePointer(&l.cf.cch, unsafe.Pointer(cache))
+			lastTs, err := lastTimestamp(cache)
 			if err != nil {
 				glog.WithError(err).Fatal("Unable to read last log timestamp.")
 			}
 			l.updateLastLogTs(lastTs)
 
 			// Open file for append.
-			l.curFile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY,
+			l.cf.f, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY,
 				os.FileMode(0644))
 			if err != nil {
 				glog.WithError(err).Fatal("Unable to open current file in append mode.")
@@ -226,7 +262,7 @@ func (l *Logger) Init() {
 		glog.WithError(err).Fatal("While walking over directory")
 	}
 	sort.Sort(ByTimestamp(l.list))
-	if l.curFile == nil {
+	if l.cf == nil {
 		l.createNew()
 	}
 	go l.periodicSync()
@@ -257,17 +293,9 @@ func parseHeader(hdr []byte) (Header, error) {
 	return h, nil
 }
 
-func lastTimestamp(path string) (int64, error) {
-	f, err := os.Open(path)
-	defer f.Close()
-
-	if err != nil {
-		return 0, err
-	}
-
-	discard := make([]byte, 4096)
-	reader := bufio.NewReaderSize(f, 2<<20)
+func lastTimestamp(c *Cache) (int64, error) {
 	var maxTs int64
+	reader := NewReader(c)
 	header := make([]byte, 16)
 	count := 0
 	for {
@@ -295,23 +323,24 @@ func lastTimestamp(path string) (int64, error) {
 			glog.WithFields(logrus.Fields{
 				"ts":         h.ts,
 				"maxts":      maxTs,
-				"path":       f.Name(),
 				"numrecords": count,
 			}).Fatal("Log file doesn't have monotonically increasing records.")
 		}
 
-		for int(h.size) > len(discard) {
-			discard = make([]byte, len(discard)*2)
-		}
-		reader.Read(discard[:int(h.size)])
+		reader.Discard(int(h.size))
 	}
 	return maxTs, nil
 }
 
-// Expects that a lock has already been acquired.
 func (l *Logger) rotateCurrent() error {
+	l.Lock()
+	defer l.Unlock()
+
+	cf := l.cf
+	cf.Lock()
+	defer cf.Unlock()
+
 	if len(l.list) > 0 {
-		// No need to acquire logFile lock.
 		last := l.list[len(l.list)-1]
 		if last.endTs > atomic.LoadInt64(&l.lastLogTs) {
 			return fmt.Errorf("Maxtimestamp is lower than existing commit logs.")
@@ -320,11 +349,11 @@ func (l *Logger) rotateCurrent() error {
 
 	lastTs := atomic.LoadInt64(&l.lastLogTs)
 	newpath := filepath.Join(l.dir, l.filepath(lastTs))
-	if err := l.curFile.Close(); err != nil {
+	if err := cf.f.Close(); err != nil {
 		return err
 	}
-	if err := os.Rename(l.curFile.Name(), newpath); err != nil {
-		glog.WithError(err).WithField("curfile", l.curFile.Name()).
+	if err := os.Rename(cf.f.Name(), newpath); err != nil {
+		glog.WithError(err).WithField("curfile", l.cf.f.Name()).
 			WithField("newfile", newpath).Error("While renaming.")
 		return err
 	}
@@ -332,8 +361,11 @@ func (l *Logger) rotateCurrent() error {
 	lf := new(logFile)
 	lf.endTs = lastTs
 	lf.path = newpath
-	lf.size = l.size
+	lf.size = cf.size
+	lf.cache = cf.cache()
 	l.list = append(l.list, lf)
+
+	l.createNew()
 	return nil
 }
 
@@ -348,8 +380,10 @@ func (l *Logger) createNew() {
 	if err != nil {
 		glog.WithError(err).Fatal("Unable to create a new file.")
 	}
-	l.curFile = f
-	l.resetCounters()
+	l.cf = new(CurFile)
+	l.cf.f = f
+	cache := new(Cache)
+	atomic.StorePointer(&l.cf.cch, unsafe.Pointer(cache))
 }
 
 func setError(prev *error, n error) {
@@ -380,32 +414,35 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
 	glog.WithField("bytes", buf.Len()).WithField("ts", ts).
 		Debug("Log entry buffer.")
 
-	l.Lock()
-	defer l.Unlock()
-
-	if l.size+int64(buf.Len()) > l.maxSize {
+	if l.curFile().Size()+int64(buf.Len()) > l.maxSize {
 		if err = l.rotateCurrent(); err != nil {
 			glog.WithError(err).Error("While rotating current file out.")
 			return err
 		}
-		l.createNew()
 	}
 
-	if l.curFile == nil {
+	cf := l.curFile()
+	if cf == nil {
 		glog.Fatalf("Current file isn't initialized.")
 	}
 
-	if _, err = l.curFile.Write(buf.Bytes()); err != nil {
+	cf.Lock()
+	defer cf.Unlock()
+	if _, err = cf.f.Write(buf.Bytes()); err != nil {
 		glog.WithError(err).Error("While writing to current file.")
 		return err
 	}
-	l.logsSinceLastSync += 1
+	if _, err = cf.cache().Write(buf.Bytes()); err != nil {
+		glog.WithError(err).Error("While writing to current cache.")
+		return err
+	}
+	cf.dirtyLogs += 1
+	cf.size += int64(buf.Len())
 	l.updateLastLogTs(ts)
-	l.size += int64(buf.Len())
-	if l.SyncEvery <= 0 || l.logsSinceLastSync >= l.SyncEvery {
-		l.logsSinceLastSync = 0
+	if l.SyncEvery <= 0 || cf.dirtyLogs >= l.SyncEvery {
+		cf.dirtyLogs = 0
 		glog.Debug("Syncing file")
-		return l.curFile.Sync()
+		return cf.f.Sync()
 	}
 	return nil
 }
@@ -413,9 +450,6 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
 func streamEntriesInFile(path string,
 	afterTs int64, hash uint32, ch chan []byte) error {
 
-	// HACK HACK HACK
-	return nil
-
 	flog := glog.WithField("path", path)
 	f, err := os.Open(path)
 	if err != nil {
@@ -473,13 +507,13 @@ func (l *Logger) StreamEntries(afterTs int64, hash uint32,
 	ch chan []byte, done chan error) {
 
 	var paths []string
-	l.Lock()
+	l.RLock()
 	for _, lf := range l.list {
 		if afterTs < lf.endTs {
 			paths = append(paths, lf.path)
 		}
 	}
-	l.Unlock()
+	l.RUnlock()
 
 	{
 		cur := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
diff --git a/commit/log_test.go b/commit/log_test.go
index 777f654e..ad2e1e52 100644
--- a/commit/log_test.go
+++ b/commit/log_test.go
@@ -22,6 +22,7 @@ import (
 	"math/rand"
 	"os"
 	"path/filepath"
+	"sync/atomic"
 	"testing"
 	"time"
 )
@@ -77,8 +78,8 @@ func TestAddLog(t *testing.T) {
 		time.Sleep(500 * time.Microsecond)
 	}
 
-	glog.Debugf("Test curfile path: %v", l.curFile.Name())
-	last, err := lastTimestamp(l.curFile.Name())
+	glog.Debugf("Test curfile path: %v", l.cf.f.Name())
+	last, err := lastTimestamp(l.cf.cache())
 	if err != nil {
 		t.Error(err)
 	}
@@ -120,10 +121,10 @@ func TestRotatingLog(t *testing.T) {
 			t.Errorf("Expected end ts: %v. Got: %v", exp, lf.endTs)
 		}
 	}
-	if l.size != 416 {
-		t.Errorf("Expected size 416. Got: %v", l.size)
+	if l.curFile().Size() != 416 {
+		t.Errorf("Expected size 416. Got: %v", l.curFile().Size())
 	}
-	if l.lastLogTs != ts+int64(8) {
+	if atomic.LoadInt64(&l.lastLogTs) != ts+int64(8) {
 		t.Errorf("Expected ts: %v. Got: %v", ts+int64(8), l.lastLogTs)
 	}
 	l.Close()
@@ -136,8 +137,8 @@ func TestRotatingLog(t *testing.T) {
 	if len(nl.list) != 4 {
 		t.Errorf("Expected 4 files. Got: %v", len(nl.list))
 	}
-	if nl.size != 416 {
-		t.Errorf("Expected size 416. Got: %v", nl.size)
+	if nl.curFile().Size() != 416 {
+		t.Errorf("Expected size 416. Got: %v", nl.curFile().Size())
 	}
 	if err := nl.AddLog(ts, 0, data); err == nil {
 		t.Error("Adding an entry with older ts should fail.")
@@ -146,8 +147,8 @@ func TestRotatingLog(t *testing.T) {
 		t.Error(err)
 		return
 	}
-	if nl.size != 832 {
-		t.Errorf("Expected size 832. Got: %v", nl.size)
+	if nl.curFile().Size() != 832 {
+		t.Errorf("Expected size 832. Got: %v", nl.curFile().Size())
 	}
 	if err := nl.AddLog(ts+int64(113), 0, data); err != nil {
 		t.Error(err)
@@ -159,8 +160,8 @@ func TestRotatingLog(t *testing.T) {
 	if nl.list[4].endTs != ts+int64(100) {
 		t.Errorf("Expected ts: %v. Got: %v", ts+int64(100), nl.list[4].endTs)
 	}
-	if nl.size != 416 {
-		t.Errorf("Expected size 416. Got: %v", nl.size)
+	if nl.curFile().Size() != 416 {
+		t.Errorf("Expected size 416. Got: %v", nl.curFile().Size())
 	}
 	if nl.lastLogTs != ts+int64(113) {
 		t.Errorf("Expected last log ts: %v. Got: %v", ts+int64(113), nl.lastLogTs)
@@ -201,8 +202,8 @@ func TestReadEntries(t *testing.T) {
 			t.Errorf("Expected end ts: %v. Got: %v", exp, lf.endTs)
 		}
 	}
-	if l.size != 416 {
-		t.Errorf("Expected size 416. Got: %v", l.size)
+	if l.curFile().Size() != 416 {
+		t.Errorf("Expected size 416. Got: %v", l.curFile().Size())
 	}
 	if l.lastLogTs != ts+int64(8) {
 		t.Errorf("Expected ts: %v. Got: %v", ts+int64(8), l.lastLogTs)
-- 
GitLab