Skip to content
Snippets Groups Projects
Commit 0d322c25 authored by Manish R Jain's avatar Manish R Jain
Browse files

Fix the reads being done without mutex locks. Replace with atomics.

parent f3f7e325
No related branches found
No related tags found
No related merge requests found
...@@ -34,6 +34,7 @@ import ( ...@@ -34,6 +34,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
...@@ -76,7 +77,7 @@ type Logger struct { ...@@ -76,7 +77,7 @@ type Logger struct {
SyncDur time.Duration SyncDur time.Duration
// Skip write to commit log to allow for testing. // Skip write to commit log to allow for testing.
SkipWrite bool skipWrite int32
sync.RWMutex sync.RWMutex
list []*logFile list []*logFile
...@@ -87,6 +88,27 @@ type Logger struct { ...@@ -87,6 +88,27 @@ type Logger struct {
ticker *time.Ticker ticker *time.Ticker
} }
func (l *Logger) SetSkipWrite(val bool) {
var v int32
v = 0
if val {
v = 1
}
atomic.StoreInt32(&l.skipWrite, v)
}
func (l *Logger) updateLastLogTs(val int64) {
for {
prev := atomic.LoadInt64(&l.lastLogTs)
if val <= prev {
return
}
if atomic.CompareAndSwapInt64(&l.lastLogTs, prev, val) {
return
}
}
}
func (l *Logger) resetCounters() { func (l *Logger) resetCounters() {
l.size = 0 l.size = 0
l.logsSinceLastSync = 0 l.logsSinceLastSync = 0
...@@ -139,6 +161,7 @@ func NewLogger(dir string, fileprefix string, maxSize int64) *Logger { ...@@ -139,6 +161,7 @@ func NewLogger(dir string, fileprefix string, maxSize int64) *Logger {
return l return l
} }
// A mutex lock should have already been acquired to call this function.
func (l *Logger) handleFile(path string, info os.FileInfo, err error) error { func (l *Logger) handleFile(path string, info os.FileInfo, err error) error {
if info.IsDir() { if info.IsDir() {
return nil return nil
...@@ -167,9 +190,7 @@ func (l *Logger) handleFile(path string, info os.FileInfo, err error) error { ...@@ -167,9 +190,7 @@ func (l *Logger) handleFile(path string, info os.FileInfo, err error) error {
lf.path = path lf.path = path
l.list = append(l.list, lf) l.list = append(l.list, lf)
if l.lastLogTs < lf.endTs { l.updateLastLogTs(lf.endTs)
l.lastLogTs = lf.endTs
}
return nil return nil
} }
...@@ -186,10 +207,11 @@ func (l *Logger) Init() { ...@@ -186,10 +207,11 @@ func (l *Logger) Init() {
// we have the file. Derive information for counters. // we have the file. Derive information for counters.
l.size = fi.Size() l.size = fi.Size()
l.logsSinceLastSync = 0 l.logsSinceLastSync = 0
l.lastLogTs, err = lastTimestamp(path) lastTs, err := lastTimestamp(path)
if err != nil { if err != nil {
glog.WithError(err).Fatal("Unable to read last log timestamp.") glog.WithError(err).Fatal("Unable to read last log timestamp.")
} }
l.updateLastLogTs(lastTs)
// Open file for append. // Open file for append.
l.curFile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY, l.curFile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY,
...@@ -286,16 +308,18 @@ func lastTimestamp(path string) (int64, error) { ...@@ -286,16 +308,18 @@ func lastTimestamp(path string) (int64, error) {
return maxTs, nil return maxTs, nil
} }
// Expects that a lock has already been acquired.
func (l *Logger) rotateCurrent() error { func (l *Logger) rotateCurrent() error {
if len(l.list) > 0 { if len(l.list) > 0 {
// No need to acquire logFile lock. // No need to acquire logFile lock.
last := l.list[len(l.list)-1] last := l.list[len(l.list)-1]
if last.endTs > l.lastLogTs { if last.endTs > atomic.LoadInt64(&l.lastLogTs) {
return fmt.Errorf("Maxtimestamp is lower than existing commit logs.") return fmt.Errorf("Maxtimestamp is lower than existing commit logs.")
} }
} }
newpath := filepath.Join(l.dir, l.filepath(l.lastLogTs)) lastTs := atomic.LoadInt64(&l.lastLogTs)
newpath := filepath.Join(l.dir, l.filepath(lastTs))
if err := l.curFile.Close(); err != nil { if err := l.curFile.Close(); err != nil {
return err return err
} }
...@@ -306,7 +330,7 @@ func (l *Logger) rotateCurrent() error { ...@@ -306,7 +330,7 @@ func (l *Logger) rotateCurrent() error {
} }
lf := new(logFile) lf := new(logFile)
lf.endTs = l.lastLogTs lf.endTs = lastTs
lf.path = newpath lf.path = newpath
lf.size = l.size lf.size = l.size
l.list = append(l.list, lf) l.list = append(l.list, lf)
...@@ -336,10 +360,10 @@ func setError(prev *error, n error) { ...@@ -336,10 +360,10 @@ func setError(prev *error, n error) {
} }
func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
if ts < l.lastLogTs { if ts < atomic.LoadInt64(&l.lastLogTs) {
return fmt.Errorf("Timestamp lower than last log timestamp.") return fmt.Errorf("Timestamp lower than last log timestamp.")
} }
if l.SkipWrite { if atomic.LoadInt32(&l.skipWrite) == 1 {
return nil return nil
} }
...@@ -376,7 +400,7 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { ...@@ -376,7 +400,7 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
return err return err
} }
l.logsSinceLastSync += 1 l.logsSinceLastSync += 1
l.lastLogTs = ts l.updateLastLogTs(ts)
l.size += int64(buf.Len()) l.size += int64(buf.Len())
if l.SyncEvery <= 0 || l.logsSinceLastSync >= l.SyncEvery { if l.SyncEvery <= 0 || l.logsSinceLastSync >= l.SyncEvery {
l.logsSinceLastSync = 0 l.logsSinceLastSync = 0
......
...@@ -118,8 +118,8 @@ func GetOrCreate(key []byte) *List { ...@@ -118,8 +118,8 @@ func GetOrCreate(key []byte) *List {
} }
l := NewList() l := NewList()
l.init(key, pstore, clog)
if inserted := lhmap.PutIfMissing(ukey, l); inserted { if inserted := lhmap.PutIfMissing(ukey, l); inserted {
l.init(key, pstore, clog)
return l return l
} else { } else {
lp, _ = lhmap.Get(ukey) lp, _ = lhmap.Get(ukey)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment