Skip to content
Snippets Groups Projects
Commit 74cabf90 authored by Manish R Jain's avatar Manish R Jain
Browse files

Added periodic sync and file rotation. Appending to existing current file is failing.

parent 155f8219
No related branches found
No related tags found
No related merge requests found
......@@ -34,6 +34,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/x"
......@@ -43,10 +44,10 @@ var glog = x.Log("commitlog")
type logFile struct {
sync.RWMutex
endTs int64
endTs int64 // never modified after creation.
path string
f *os.File
size uint64
size int64
}
type ByTimestamp []*logFile
......@@ -66,21 +67,68 @@ type Logger struct {
// MaxSize is the maximum size of commit log file in bytes,
// before it gets rotated.
maxSize uint64
maxSize int64
// Sync every N logs. A value of zero or less would mean
// sync every append to file.
SyncEvery int
// Sync every d duration.
SyncDur time.Duration
sync.RWMutex
list []*logFile
curFile *os.File
size int64
lastLogTs int64
logsSinceLastSync int
ticker *time.Ticker
}
func (l *Logger) resetCounters() {
l.size = 0
l.logsSinceLastSync = 0
}
func (l *Logger) periodicSync() {
glog.WithField("dur", l.SyncDur).Debug("Periodic sync.")
if l.SyncDur == 0 {
glog.Info("No Periodic Sync for commit log.")
return
}
l.ticker = time.NewTicker(l.SyncDur)
for _ = range l.ticker.C {
l.Lock()
if l.curFile != nil && l.logsSinceLastSync > 0 {
if err := l.curFile.Sync(); err != nil {
glog.WithError(err).Error("While periodically syncing.")
} else {
l.logsSinceLastSync = 0
glog.Debug("Successful periodic sync.")
}
} else {
glog.Debug("Skipping periodic sync.")
}
l.Unlock()
}
}
func NewLogger(dir string, fileprefix string, maxSize uint64) *Logger {
func (l *Logger) Close() {
l.Lock()
defer l.Unlock()
if l.ticker != nil {
l.ticker.Stop()
}
if l.curFile != nil {
if err := l.curFile.Close(); err != nil {
glog.WithError(err).Error("While closing current file.")
}
}
}
func NewLogger(dir string, fileprefix string, maxSize int64) *Logger {
l := new(Logger)
l.dir = dir
l.filePrefix = fileprefix
......@@ -101,19 +149,36 @@ func (l *Logger) handleFile(path string, info os.FileInfo, err error) error {
lidx := strings.LastIndex(info.Name(), ".log")
tstring := info.Name()[len(l.filePrefix)+1 : lidx]
glog.WithField("log_ts", tstring).Debug("Found log.")
// Handle if we find the current log file.
if tstring == "current" {
var err error
l.size = info.Size()
l.curFile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY, os.FileMode(0644))
if err != nil {
glog.WithError(err).Fatal("Unable to open file in write mode.")
}
/*
ret, err := l.curFile.Seek(info.Size(), 0)
if err != nil || ret != info.Size() {
glog.WithError(err).Fatal("Unable to seek to end of file.")
}
*/
l.lastLogTs, err = lastTimestamp(path)
if err != nil {
glog.WithError(err).Fatal("Unable to read last log timestamp.")
}
return nil
}
ts, err := strconv.ParseInt(tstring, 16, 64)
if err != nil {
return err
}
lf := new(logFile)
lf.endTs = ts
lf.path = path
l.list = append(l.list, lf)
return nil
}
......@@ -125,7 +190,10 @@ func (l *Logger) Init() {
glog.WithError(err).Fatal("While walking over directory")
}
sort.Sort(ByTimestamp(l.list))
l.createNew()
if l.curFile == nil {
l.createNew()
}
go l.periodicSync()
}
func (l *Logger) filepath(ts int64) string {
......@@ -199,6 +267,33 @@ func lastTimestamp(path string) (int64, error) {
return maxTs, nil
}
func (l *Logger) rotateCurrent() error {
if len(l.list) > 0 {
// No need to acquire logFile lock.
last := l.list[len(l.list)-1]
if last.endTs > l.lastLogTs {
return fmt.Errorf("Maxtimestamp is lower than existing commit logs.")
}
}
newpath := filepath.Join(l.dir, l.filepath(l.lastLogTs))
if err := l.curFile.Close(); err != nil {
return err
}
if err := os.Rename(l.curFile.Name(), newpath); err != nil {
glog.WithError(err).WithField("curfile", l.curFile.Name()).
WithField("newfile", newpath).Error("While renaming.")
return err
}
lf := new(logFile)
lf.endTs = l.lastLogTs
lf.path = newpath
lf.size = l.size
l.list = append(l.list, lf)
return nil
}
// Expects a lock has already been acquired.
func (l *Logger) createNew() {
path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
......@@ -211,7 +306,7 @@ func (l *Logger) createNew() {
glog.WithError(err).Fatal("Unable to create a new file.")
}
l.curFile = f
l.size = 0
l.resetCounters()
}
func setError(prev *error, n error) {
......@@ -241,6 +336,15 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
l.Lock()
defer l.Unlock()
if l.size+int64(buf.Len()) > l.maxSize {
if err = l.rotateCurrent(); err != nil {
glog.WithError(err).Error("While rotating current file out.")
return err
}
l.createNew()
}
if l.curFile == nil {
glog.Fatalf("Current file isn't initialized.")
}
......@@ -251,8 +355,10 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
}
l.logsSinceLastSync += 1
l.lastLogTs = ts
l.size += int64(buf.Len())
if l.SyncEvery <= 0 || l.logsSinceLastSync >= l.SyncEvery {
l.logsSinceLastSync = 0
glog.Debug("Syncing file")
return l.curFile.Sync()
}
return nil
......
......@@ -64,7 +64,10 @@ func TestAddLog(t *testing.T) {
defer os.RemoveAll(dir)
l := NewLogger(dir, "dgraph", 50<<20)
l.SyncDur = time.Millisecond
l.SyncEvery = 1000 // So, sync after write never gets called.
l.Init()
defer l.Close()
ts := time.Now().UnixNano()
for i := 0; i < 10; i++ {
......@@ -73,6 +76,7 @@ func TestAddLog(t *testing.T) {
t.Error(err)
return
}
time.Sleep(500 * time.Microsecond)
}
glog.Debugf("Test curfile path: %v", l.curFile.Name())
......@@ -84,3 +88,78 @@ func TestAddLog(t *testing.T) {
t.Errorf("Expected %v. Got: %v\n", ts+9, last)
}
}
func TestRotatingLog(t *testing.T) {
dir, err := ioutil.TempDir("", "dgraph-log")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir)
l := NewLogger(dir, "dgraph", 1024) // 1 kB
l.SyncDur = 0
l.SyncEvery = 0
l.Init()
data := make([]byte, 400)
ts := time.Now().UnixNano()
for i := 0; i < 9; i++ {
curts := ts + int64(i)
if err := l.AddLog(curts, 0, data); err != nil {
t.Error(err)
return
}
}
// This should have created 4 files of 832 bytes each (header + data), and
// the current file should be of size 416.
if len(l.list) != 4 {
t.Errorf("Expected 4 files. Got: %v", len(l.list))
}
for i, lf := range l.list {
exp := ts + int64(2*i+1)
if lf.endTs != exp {
t.Errorf("Expected end ts: %v. Got: %v", exp, lf.endTs)
}
}
if l.size != 416 {
t.Errorf("Expected size 416. Got: %v", l.size)
}
if l.lastLogTs != ts+int64(8) {
t.Errorf("Expected ts: %v. Got: %v", ts+int64(8), l.lastLogTs)
}
l.Close()
// Now, let's test a re-init of logger.
nl := NewLogger(dir, "dgraph", 1024)
nl.Init()
if len(nl.list) != 4 {
t.Errorf("Expected 4 files. Got: %v", len(nl.list))
}
if nl.size != 416 {
t.Errorf("Expected size 416. Got: %v", nl.size)
}
if err := l.AddLog(ts+int64(100), 0, data); err != nil {
t.Error(err)
return
}
if nl.size != 832 {
t.Errorf("Expected size 832. Got: %v", nl.size)
}
if err := l.AddLog(ts+int64(113), 0, data); err != nil {
t.Error(err)
return
}
if len(nl.list) != 5 {
t.Errorf("Expected 4 files. Got: %v", len(nl.list))
}
if nl.list[4].endTs != ts+int64(100) {
t.Errorf("Expected ts: %v. Got: %v", ts+int64(100), nl.list[4].endTs)
}
if nl.size != 416 {
t.Errorf("Expected size 416. Got: %v", nl.size)
}
if nl.lastLogTs != ts+int64(113) {
t.Errorf("Expected last log ts: %v. Got: %v", ts+int64(113), nl.lastLogTs)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment