diff --git a/commit/log.go b/commit/log.go index 3c0f8889593a1c79e604735eeadd75002ec30fc9..12c5de8db272967a04b60b545699c9d281591c10 100644 --- a/commit/log.go +++ b/commit/log.go @@ -34,6 +34,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/x" @@ -43,10 +44,10 @@ var glog = x.Log("commitlog") type logFile struct { sync.RWMutex - endTs int64 + endTs int64 // never modified after creation. path string f *os.File - size uint64 + size int64 } type ByTimestamp []*logFile @@ -66,21 +67,68 @@ type Logger struct { // MaxSize is the maximum size of commit log file in bytes, // before it gets rotated. - maxSize uint64 + maxSize int64 // Sync every N logs. A value of zero or less would mean // sync every append to file. SyncEvery int + // Sync every d duration. + SyncDur time.Duration + sync.RWMutex list []*logFile curFile *os.File size int64 lastLogTs int64 logsSinceLastSync int + ticker *time.Ticker +} + +func (l *Logger) resetCounters() { + l.size = 0 + l.logsSinceLastSync = 0 +} + +func (l *Logger) periodicSync() { + glog.WithField("dur", l.SyncDur).Debug("Periodic sync.") + if l.SyncDur == 0 { + glog.Info("No Periodic Sync for commit log.") + return + } + + l.ticker = time.NewTicker(l.SyncDur) + for _ = range l.ticker.C { + l.Lock() + if l.curFile != nil && l.logsSinceLastSync > 0 { + if err := l.curFile.Sync(); err != nil { + glog.WithError(err).Error("While periodically syncing.") + } else { + l.logsSinceLastSync = 0 + glog.Debug("Successful periodic sync.") + } + } else { + glog.Debug("Skipping periodic sync.") + } + l.Unlock() + } } -func NewLogger(dir string, fileprefix string, maxSize uint64) *Logger { +func (l *Logger) Close() { + l.Lock() + defer l.Unlock() + + if l.ticker != nil { + l.ticker.Stop() + } + if l.curFile != nil { + if err := l.curFile.Close(); err != nil { + glog.WithError(err).Error("While closing current file.") + } + } +} + +func NewLogger(dir string, fileprefix string, maxSize int64) *Logger { l := new(Logger) l.dir = dir l.filePrefix = fileprefix @@ -101,19 +149,36 @@ func (l *Logger) handleFile(path string, info os.FileInfo, err error) error { lidx := strings.LastIndex(info.Name(), ".log") tstring := info.Name()[len(l.filePrefix)+1 : lidx] glog.WithField("log_ts", tstring).Debug("Found log.") + + // Handle if we find the current log file. if tstring == "current" { + var err error + l.size = info.Size() + l.curFile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY, os.FileMode(0644)) + if err != nil { + glog.WithError(err).Fatal("Unable to open file in write mode.") + } + /* + ret, err := l.curFile.Seek(info.Size(), 0) + if err != nil || ret != info.Size() { + glog.WithError(err).Fatal("Unable to seek to end of file.") + } + */ + l.lastLogTs, err = lastTimestamp(path) + if err != nil { + glog.WithError(err).Fatal("Unable to read last log timestamp.") + } return nil } + ts, err := strconv.ParseInt(tstring, 16, 64) if err != nil { return err } - lf := new(logFile) lf.endTs = ts lf.path = path l.list = append(l.list, lf) - return nil } @@ -125,7 +190,10 @@ func (l *Logger) Init() { glog.WithError(err).Fatal("While walking over directory") } sort.Sort(ByTimestamp(l.list)) - l.createNew() + if l.curFile == nil { + l.createNew() + } + go l.periodicSync() } func (l *Logger) filepath(ts int64) string { @@ -199,6 +267,33 @@ func lastTimestamp(path string) (int64, error) { return maxTs, nil } +func (l *Logger) rotateCurrent() error { + if len(l.list) > 0 { + // No need to acquire logFile lock. + last := l.list[len(l.list)-1] + if last.endTs > l.lastLogTs { + return fmt.Errorf("Maxtimestamp is lower than existing commit logs.") + } + } + + newpath := filepath.Join(l.dir, l.filepath(l.lastLogTs)) + if err := l.curFile.Close(); err != nil { + return err + } + if err := os.Rename(l.curFile.Name(), newpath); err != nil { + glog.WithError(err).WithField("curfile", l.curFile.Name()). + WithField("newfile", newpath).Error("While renaming.") + return err + } + + lf := new(logFile) + lf.endTs = l.lastLogTs + lf.path = newpath + lf.size = l.size + l.list = append(l.list, lf) + return nil +} + // Expects a lock has already been acquired. func (l *Logger) createNew() { path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix)) @@ -211,7 +306,7 @@ func (l *Logger) createNew() { glog.WithError(err).Fatal("Unable to create a new file.") } l.curFile = f - l.size = 0 + l.resetCounters() } func setError(prev *error, n error) { @@ -241,6 +336,15 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { l.Lock() defer l.Unlock() + + if l.size+int64(buf.Len()) > l.maxSize { + if err = l.rotateCurrent(); err != nil { + glog.WithError(err).Error("While rotating current file out.") + return err + } + l.createNew() + } + if l.curFile == nil { glog.Fatalf("Current file isn't initialized.") } @@ -251,8 +355,10 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { } l.logsSinceLastSync += 1 l.lastLogTs = ts + l.size += int64(buf.Len()) if l.SyncEvery <= 0 || l.logsSinceLastSync >= l.SyncEvery { l.logsSinceLastSync = 0 + glog.Debug("Syncing file") return l.curFile.Sync() } return nil diff --git a/commit/log_test.go b/commit/log_test.go index 5e342d0bd7b4e453508ccaf624cf05eb9855fc25..810ca265d5d543a7c14f938e83f108144df1807b 100644 --- a/commit/log_test.go +++ b/commit/log_test.go @@ -64,7 +64,10 @@ func TestAddLog(t *testing.T) { defer os.RemoveAll(dir) l := NewLogger(dir, "dgraph", 50<<20) + l.SyncDur = time.Millisecond + l.SyncEvery = 1000 // So, sync after write never gets called. l.Init() + defer l.Close() ts := time.Now().UnixNano() for i := 0; i < 10; i++ { @@ -73,6 +76,7 @@ func TestAddLog(t *testing.T) { t.Error(err) return } + time.Sleep(500 * time.Microsecond) } glog.Debugf("Test curfile path: %v", l.curFile.Name()) @@ -84,3 +88,78 @@ func TestAddLog(t *testing.T) { t.Errorf("Expected %v. Got: %v\n", ts+9, last) } } + +func TestRotatingLog(t *testing.T) { + dir, err := ioutil.TempDir("", "dgraph-log") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dir) + + l := NewLogger(dir, "dgraph", 1024) // 1 kB + l.SyncDur = 0 + l.SyncEvery = 0 + l.Init() + + data := make([]byte, 400) + ts := time.Now().UnixNano() + for i := 0; i < 9; i++ { + curts := ts + int64(i) + if err := l.AddLog(curts, 0, data); err != nil { + t.Error(err) + return + } + } + // This should have created 4 files of 832 bytes each (header + data), and + // the current file should be of size 416. + if len(l.list) != 4 { + t.Errorf("Expected 4 files. Got: %v", len(l.list)) + } + for i, lf := range l.list { + exp := ts + int64(2*i+1) + if lf.endTs != exp { + t.Errorf("Expected end ts: %v. Got: %v", exp, lf.endTs) + } + } + if l.size != 416 { + t.Errorf("Expected size 416. Got: %v", l.size) + } + if l.lastLogTs != ts+int64(8) { + t.Errorf("Expected ts: %v. Got: %v", ts+int64(8), l.lastLogTs) + } + l.Close() + + // Now, let's test a re-init of logger. + nl := NewLogger(dir, "dgraph", 1024) + nl.Init() + if len(nl.list) != 4 { + t.Errorf("Expected 4 files. Got: %v", len(nl.list)) + } + if nl.size != 416 { + t.Errorf("Expected size 416. Got: %v", nl.size) + } + if err := l.AddLog(ts+int64(100), 0, data); err != nil { + t.Error(err) + return + } + if nl.size != 832 { + t.Errorf("Expected size 832. Got: %v", nl.size) + } + if err := l.AddLog(ts+int64(113), 0, data); err != nil { + t.Error(err) + return + } + if len(nl.list) != 5 { + t.Errorf("Expected 4 files. Got: %v", len(nl.list)) + } + if nl.list[4].endTs != ts+int64(100) { + t.Errorf("Expected ts: %v. Got: %v", ts+int64(100), nl.list[4].endTs) + } + if nl.size != 416 { + t.Errorf("Expected size 416. Got: %v", nl.size) + } + if nl.lastLogTs != ts+int64(113) { + t.Errorf("Expected last log ts: %v. Got: %v", ts+int64(113), nl.lastLogTs) + } +}