diff --git a/commit/log.go b/commit/log.go index 46bbc1f44349ce70a9d3e38ce75534571254b2eb..7141a583265d7d8a223183bb786458ec7e99a6f5 100644 --- a/commit/log.go +++ b/commit/log.go @@ -34,6 +34,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/Sirupsen/logrus" @@ -76,7 +77,7 @@ type Logger struct { SyncDur time.Duration // Skip write to commit log to allow for testing. - SkipWrite bool + skipWrite int32 sync.RWMutex list []*logFile @@ -87,6 +88,27 @@ type Logger struct { ticker *time.Ticker } +func (l *Logger) SetSkipWrite(val bool) { + var v int32 + v = 0 + if val { + v = 1 + } + atomic.StoreInt32(&l.skipWrite, v) +} + +func (l *Logger) updateLastLogTs(val int64) { + for { + prev := atomic.LoadInt64(&l.lastLogTs) + if val <= prev { + return + } + if atomic.CompareAndSwapInt64(&l.lastLogTs, prev, val) { + return + } + } +} + func (l *Logger) resetCounters() { l.size = 0 l.logsSinceLastSync = 0 @@ -139,6 +161,7 @@ func NewLogger(dir string, fileprefix string, maxSize int64) *Logger { return l } +// A mutex lock should have already been acquired to call this function. func (l *Logger) handleFile(path string, info os.FileInfo, err error) error { if info.IsDir() { return nil @@ -167,9 +190,7 @@ func (l *Logger) handleFile(path string, info os.FileInfo, err error) error { lf.path = path l.list = append(l.list, lf) - if l.lastLogTs < lf.endTs { - l.lastLogTs = lf.endTs - } + l.updateLastLogTs(lf.endTs) return nil } @@ -186,10 +207,11 @@ func (l *Logger) Init() { // we have the file. Derive information for counters. l.size = fi.Size() l.logsSinceLastSync = 0 - l.lastLogTs, err = lastTimestamp(path) + lastTs, err := lastTimestamp(path) if err != nil { glog.WithError(err).Fatal("Unable to read last log timestamp.") } + l.updateLastLogTs(lastTs) // Open file for append. l.curFile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY, @@ -286,16 +308,18 @@ func lastTimestamp(path string) (int64, error) { return maxTs, nil } +// Expects that a lock has already been acquired. func (l *Logger) rotateCurrent() error { if len(l.list) > 0 { // No need to acquire logFile lock. last := l.list[len(l.list)-1] - if last.endTs > l.lastLogTs { + if last.endTs > atomic.LoadInt64(&l.lastLogTs) { return fmt.Errorf("Maxtimestamp is lower than existing commit logs.") } } - newpath := filepath.Join(l.dir, l.filepath(l.lastLogTs)) + lastTs := atomic.LoadInt64(&l.lastLogTs) + newpath := filepath.Join(l.dir, l.filepath(lastTs)) if err := l.curFile.Close(); err != nil { return err } @@ -306,7 +330,7 @@ func (l *Logger) rotateCurrent() error { } lf := new(logFile) - lf.endTs = l.lastLogTs + lf.endTs = lastTs lf.path = newpath lf.size = l.size l.list = append(l.list, lf) @@ -336,10 +360,10 @@ func setError(prev *error, n error) { } func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { - if ts < l.lastLogTs { + if ts < atomic.LoadInt64(&l.lastLogTs) { return fmt.Errorf("Timestamp lower than last log timestamp.") } - if l.SkipWrite { + if atomic.LoadInt32(&l.skipWrite) == 1 { return nil } @@ -376,7 +400,7 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { return err } l.logsSinceLastSync += 1 - l.lastLogTs = ts + l.updateLastLogTs(ts) l.size += int64(buf.Len()) if l.SyncEvery <= 0 || l.logsSinceLastSync >= l.SyncEvery { l.logsSinceLastSync = 0 diff --git a/posting/lists.go b/posting/lists.go index a12874dbe00596a56fb62f36e0d8239fc4ab307b..99c9d433b52f6771a945e9dbfa3ba6a9d3849bb2 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -118,8 +118,8 @@ func GetOrCreate(key []byte) *List { } l := NewList() - l.init(key, pstore, clog) if inserted := lhmap.PutIfMissing(ukey, l); inserted { + l.init(key, pstore, clog) return l } else { lp, _ = lhmap.Get(ukey)