Newer
Older
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// commit package provides commit logs for storing mutations, as they arrive
// at the server. Mutations also get stored in memory within posting.List.
// So, commit logs are useful to handle machine crashes, and re-init of a
// posting list.
// This package provides functionality to write to a rotating log, and a way
// to quickly filter relevant entries corresponding to an attribute.
package commit
import (
"bytes"
"encoding/binary"
Manish R Jain
committed
"runtime/debug"
"strconv"
"strings"
"sync"
"sync/atomic"
Manish R Jain
committed
"time"
Manish R Jain
committed
"unsafe"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/x"
)
var glog = x.Log("commitlog")
type logFile struct {
sync.RWMutex
Manish R Jain
committed
endTs int64 // never modified after creation.
path string
Manish R Jain
committed
size int64
Manish R Jain
committed
cache *Cache
Manish R Jain
committed
}
Manish R Jain
committed
func (lf *logFile) Cache() *Cache {
lf.RLock()
defer lf.RUnlock()
return lf.cache
}
func (lf *logFile) FillIfEmpty(wg *sync.WaitGroup) {
lf.Lock()
defer lf.Unlock()
defer wg.Done()
if lf.cache != nil {
return
}
cache := new(Cache)
if err := FillCache(cache, lf.path); err != nil {
glog.WithError(err).WithField("path", lf.path).
Fatal("Unable to fill cache.")
}
// No need to acquire lock on cache, because it just
// got created.
createAndUpdateBloomFilter(cache)
lf.cache = cache
}
// Lock must have been acquired.
func createAndUpdateBloomFilter(cache *Cache) {
hashes := make([]uint32, 50000)
hashes = hashes[:0]
if err := streamEntries(cache, 0, 0, func(hdr Header, record []byte) {
hashes = append(hashes, hdr.hash)
}); err != nil {
glog.WithError(err).Fatal("Unable to create bloom filters.")
}
n := 100000
if len(hashes) > n {
n = len(hashes)
}
cache.bf = bloom.NewWithEstimates(uint(n), 0.0001)
for _, hash := range hashes {
cache.bf.Add(toBytes(hash))
Manish R Jain
committed
}
}
Manish R Jain
committed
type CurFile struct {
sync.RWMutex
f *os.File
size int64
dirtyLogs int
cch unsafe.Pointer // handled via atomics.
}
func (c *CurFile) cache() *Cache {
Manish R Jain
committed
if c == nil {
debug.PrintStack()
// This got triggered due to a premature cleanup in query_test.go
}
Manish R Jain
committed
v := atomic.LoadPointer(&c.cch)
if v == nil {
return nil
}
return (*Cache)(v)
}
func (c *CurFile) Size() int64 {
c.RLock()
defer c.RUnlock()
return c.size
}
type ByTimestamp []*logFile
func (b ByTimestamp) Len() int { return len(b) }
func (b ByTimestamp) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b ByTimestamp) Less(i, j int) bool {
return b[i].endTs < b[j].endTs
}
type Logger struct {
// Directory to store logs into.
dir string
// Prefix all filenames with this.
filePrefix string
// MaxSize is the maximum size of commit log file in bytes,
// before it gets rotated.
Manish R Jain
committed
maxSize int64
// Sync every N logs. A value of zero or less would mean
// sync every append to file.
SyncEvery int
Manish R Jain
committed
// Sync every d duration.
SyncDur time.Duration
sync.RWMutex
Manish R Jain
committed
list []*logFile
cf *CurFile
lastLogTs int64 // handled via atomics.
ticker *time.Ticker
}
func (l *Logger) curFile() *CurFile {
l.RLock()
defer l.RUnlock()
return l.cf
Manish R Jain
committed
}
func (l *Logger) updateLastLogTs(val int64) {
for {
prev := atomic.LoadInt64(&l.lastLogTs)
if val <= prev {
return
}
if atomic.CompareAndSwapInt64(&l.lastLogTs, prev, val) {
return
}
}
}
Manish R Jain
committed
func (l *Logger) DeleteCacheOlderThan(v time.Duration) {
l.RLock()
defer l.RUnlock()
s := int64(v.Seconds())
for _, lf := range l.list {
if lf.Cache().LastAccessedInSeconds() > s {
lf.Lock()
lf.cache = nil
lf.Unlock()
}
}
}
Manish R Jain
committed
func (l *Logger) periodicSync() {
glog.WithField("dur", l.SyncDur).Debug("Periodic sync.")
if l.SyncDur == 0 {
glog.Debug("No Periodic Sync for commit log.")
Manish R Jain
committed
return
}
l.ticker = time.NewTicker(l.SyncDur)
for _ = range l.ticker.C {
Manish R Jain
committed
cf := l.curFile()
if cf == nil {
continue
}
{
cf.Lock()
if cf.dirtyLogs > 0 {
if err := cf.f.Sync(); err != nil {
glog.WithError(err).Error("While periodically syncing.")
} else {
cf.dirtyLogs = 0
glog.Debug("Successful periodic sync.")
}
Manish R Jain
committed
} else {
Manish R Jain
committed
glog.Debug("Skipping periodic sync.")
Manish R Jain
committed
}
Manish R Jain
committed
cf.Unlock()
Manish R Jain
committed
}
}
Manish R Jain
committed
func (l *Logger) Close() {
l.Lock()
defer l.Unlock()
if l.ticker != nil {
l.ticker.Stop()
}
Manish R Jain
committed
if l.cf != nil {
if err := l.cf.f.Close(); err != nil {
Manish R Jain
committed
glog.WithError(err).Error("While closing current file.")
}
Manish R Jain
committed
l.cf = nil
Manish R Jain
committed
}
}
func NewLogger(dir string, fileprefix string, maxSize int64) *Logger {
l := new(Logger)
l.dir = dir
l.filePrefix = fileprefix
l.maxSize = maxSize
return l
}
// A mutex lock should have already been acquired to call this function.
func (l *Logger) handleFile(path string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
if !strings.HasPrefix(info.Name(), l.filePrefix+"-") {
return nil
}
if !strings.HasSuffix(info.Name(), ".log") {
return nil
}
lidx := strings.LastIndex(info.Name(), ".log")
tstring := info.Name()[len(l.filePrefix)+1 : lidx]
glog.WithField("log_ts", tstring).Debug("Found log.")
Manish R Jain
committed
// Handle if we find the current log file.
if tstring == "current" {
return nil
}
Manish R Jain
committed
ts, err := strconv.ParseInt(tstring, 16, 64)
if err != nil {
return err
}
lf := new(logFile)
lf.endTs = ts
lf.path = path
l.list = append(l.list, lf)
l.updateLastLogTs(lf.endTs)
return nil
}
func (l *Logger) Init() {
l.Lock()
defer l.Unlock()
glog.Debug("Logger init started.")
// Checking if the directory exists.
if _, err := os.Stat(l.dir); err != nil {
if os.IsNotExist(err) {
glog.WithError(err).Fatal("Unable to find dir.")
}
}
// First check if we have a current file.
path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
fi, err := os.Stat(path)
if err == nil {
// we have the file. Derive information for counters.
Manish R Jain
committed
l.cf = new(CurFile)
l.cf.size = fi.Size()
l.cf.dirtyLogs = 0
cache := new(Cache)
if ferr := FillCache(cache, path); ferr != nil {
glog.WithError(ferr).Fatal("Unable to write to cache.")
}
Manish R Jain
committed
atomic.StorePointer(&l.cf.cch, unsafe.Pointer(cache))
lastTs, err := lastTimestamp(cache)
if err != nil {
glog.WithError(err).Fatal("Unable to read last log timestamp.")
}
l.updateLastLogTs(lastTs)
Manish R Jain
committed
l.cf.f, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY,
os.FileMode(0644))
if err != nil {
glog.WithError(err).Fatal("Unable to open current file in append mode.")
}
}
}
if err := filepath.Walk(l.dir, l.handleFile); err != nil {
glog.WithError(err).Fatal("While walking over directory")
}
sort.Sort(ByTimestamp(l.list))
Manish R Jain
committed
if l.cf == nil {
Manish R Jain
committed
l.createNew()
}
go l.periodicSync()
glog.Debug("Logger init finished.")
}
func (l *Logger) filepath(ts int64) string {
return fmt.Sprintf("%s-%s.log", l.filePrefix, strconv.FormatInt(ts, 16))
}
type Header struct {
ts int64
hash uint32
size int32
}
func parseHeader(hdr []byte) (Header, error) {
buf := bytes.NewBuffer(hdr)
var h Header
var err error
setError(&err, binary.Read(buf, binary.LittleEndian, &h.ts))
setError(&err, binary.Read(buf, binary.LittleEndian, &h.hash))
setError(&err, binary.Read(buf, binary.LittleEndian, &h.size))
if err != nil {
glog.WithError(err).Error("While parsing header.")
return h, err
}
return h, nil
}
Manish R Jain
committed
func lastTimestamp(c *Cache) (int64, error) {
var maxTs int64
Manish R Jain
committed
reader := NewReader(c)
header := make([]byte, 16)
count := 0
for {
n, err := reader.Read(header)
if err == io.EOF {
break
}
if n < len(header) {
glog.WithField("n", n).Fatal("Unable to read the full 16 byte header.")
}
if err != nil {
glog.WithError(err).Error("While reading header.")
return 0, err
}
count += 1
h, err := parseHeader(header)
if err != nil {
return 0, err
}
if h.ts > maxTs {
maxTs = h.ts
} else if h.ts < maxTs {
glog.WithFields(logrus.Fields{
"ts": h.ts,
"maxts": maxTs,
"numrecords": count,
}).Fatal("Log file doesn't have monotonically increasing records.")
}
Manish R Jain
committed
reader.Discard(int(h.size))
}
return maxTs, nil
}
Manish R Jain
committed
func (l *Logger) rotateCurrent() error {
Manish R Jain
committed
l.Lock()
defer l.Unlock()
cf := l.cf
cf.Lock()
defer cf.Unlock()
Manish R Jain
committed
if len(l.list) > 0 {
last := l.list[len(l.list)-1]
if last.endTs > atomic.LoadInt64(&l.lastLogTs) {
Manish R Jain
committed
return fmt.Errorf("Maxtimestamp is lower than existing commit logs.")
}
}
lastTs := atomic.LoadInt64(&l.lastLogTs)
newpath := filepath.Join(l.dir, l.filepath(lastTs))
Manish R Jain
committed
if err := cf.f.Close(); err != nil {
Manish R Jain
committed
return err
}
Manish R Jain
committed
if err := os.Rename(cf.f.Name(), newpath); err != nil {
glog.WithError(err).WithField("curfile", l.cf.f.Name()).
Manish R Jain
committed
WithField("newfile", newpath).Error("While renaming.")
return err
}
lf := new(logFile)
lf.endTs = lastTs
Manish R Jain
committed
lf.path = newpath
Manish R Jain
committed
lf.size = cf.size
lf.cache = cf.cache()
Manish R Jain
committed
l.list = append(l.list, lf)
Manish R Jain
committed
l.createNew()
Manish R Jain
committed
return nil
}
// Expects a lock has already been acquired.
func (l *Logger) createNew() {
path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
if err := os.MkdirAll(l.dir, 0744); err != nil {
glog.WithError(err).Fatal("Unable to create directory.")
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC,
os.FileMode(0644))
if err != nil {
glog.WithError(err).Fatal("Unable to create a new file.")
}
Manish R Jain
committed
l.cf = new(CurFile)
l.cf.f = f
cache := new(Cache)
Manish R Jain
committed
atomic.StorePointer(&l.cf.cch, unsafe.Pointer(cache))
}
func setError(prev *error, n error) {
if prev == nil {
prev = &n
}
return
}
func (l *Logger) AddLog(hash uint32, value []byte) (int64, error) {
lbuf := int64(len(value)) + 16
if l.curFile().Size()+lbuf > l.maxSize {
if err := l.rotateCurrent(); err != nil {
glog.WithError(err).Error("While rotating current file out.")
return 0, err
}
}
cf := l.curFile()
if cf == nil {
glog.Fatalf("Current file isn't initialized.")
}
cf.Lock()
defer cf.Unlock()
ts := time.Now().UnixNano()
lts := atomic.LoadInt64(&l.lastLogTs)
if ts < lts {
ts = lts + 1
// We don't have to do CompareAndSwap because we've a mutex lock.
}
buf := new(bytes.Buffer)
var err error
setError(&err, binary.Write(buf, binary.LittleEndian, ts))
setError(&err, binary.Write(buf, binary.LittleEndian, hash))
setError(&err, binary.Write(buf, binary.LittleEndian, int32(len(value))))
_, nerr := buf.Write(value)
setError(&err, nerr)
if err != nil {
return ts, err
glog.WithField("bytes", buf.Len()).Debug("Log entry buffer.")
Manish R Jain
committed
if _, err = cf.f.Write(buf.Bytes()); err != nil {
glog.WithError(err).Error("While writing to current file.")
return ts, err
if _, err = cf.cache().Write(hash, buf.Bytes()); err != nil {
Manish R Jain
committed
glog.WithError(err).Error("While writing to current cache.")
return ts, err
Manish R Jain
committed
}
cf.dirtyLogs += 1
cf.size += int64(buf.Len())
l.updateLastLogTs(ts)
Manish R Jain
committed
if l.SyncEvery <= 0 || cf.dirtyLogs >= l.SyncEvery {
cf.dirtyLogs = 0
Manish R Jain
committed
glog.Debug("Syncing file")
return ts, cf.f.Sync()
return ts, nil
// streamEntries allows for hash to be zero.
// This means iterate over all the entries.
Manish R Jain
committed
func streamEntries(cache *Cache,
afterTs int64, hash uint32, iter LogIterator) error {
Manish R Jain
committed
flog := glog
reader := NewReader(cache)
header := make([]byte, 16)
for {
Manish R Jain
committed
_, err := reader.Read(header)
if err == io.EOF {
Manish R Jain
committed
flog.Debug("Cache read complete.")
break
}
if err != nil {
Manish R Jain
committed
flog.WithError(err).Fatal("While reading header.")
return err
}
Manish R Jain
committed
hdr, err := parseHeader(header)
if err != nil {
flog.WithError(err).Error("While parsing header.")
return err
}
Manish R Jain
committed
if (hash == 0 || hdr.hash == hash) && hdr.ts >= afterTs {
// Iterator expects a copy of the buffer, so create one, instead of
// creating a big buffer upfront and reusing it.
data := make([]byte, hdr.size)
Manish R Jain
committed
_, err := reader.Read(data)
if err != nil {
Manish R Jain
committed
flog.WithError(err).Fatal("While reading data.")
return err
}
} else {
Manish R Jain
committed
reader.Discard(int(hdr.size))
}
}
return nil
}
type LogIterator func(hdr Header, record []byte)
Manish R Jain
committed
func (l *Logger) StreamEntries(afterTs int64, hash uint32,
Manish R Jain
committed
iter LogIterator) error {
Manish R Jain
committed
if atomic.LoadInt64(&l.lastLogTs) < afterTs {
return nil
}
Manish R Jain
committed
var wg sync.WaitGroup
Manish R Jain
committed
l.RLock()
for _, lf := range l.list {
if afterTs < lf.endTs {
Manish R Jain
committed
wg.Add(1)
go lf.FillIfEmpty(&wg)
Manish R Jain
committed
l.RUnlock()
Manish R Jain
committed
wg.Wait()
Manish R Jain
committed
l.RLock()
var caches []*Cache
for _, lf := range l.list {
if afterTs < lf.endTs && lf.cache.Present(hash) {
Manish R Jain
committed
caches = append(caches, lf.Cache())
Manish R Jain
committed
l.RUnlock()
// Add current cache.
if l.curFile().cache().Present(hash) {
caches = append(caches, l.curFile().cache())
}
Manish R Jain
committed
for _, cache := range caches {
if cache == nil {
glog.Error("Cache is nil")
continue
}
if err := streamEntries(cache, afterTs, hash, iter); err != nil {
return err
Manish R Jain
committed
return nil