Skip to content
Snippets Groups Projects
log.go 8.42 KiB
Newer Older
/*
 * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * 		http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// commit package provides commit logs for storing mutations, as they arrive
// at the server. Mutations also get stored in memory within posting.List.
// So, commit logs are useful to handle machine crashes, and re-init of a
// posting list.
// This package provides functionality to write to a rotating log, and a way
// to quickly filter relevant entries corresponding to an attribute.
package commit

import (
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"sync"
	"github.com/dgraph-io/dgraph/x"
)

var glog = x.Log("commitlog")

type logFile struct {
	sync.RWMutex
	endTs int64 // never modified after creation.
}

type ByTimestamp []*logFile

func (b ByTimestamp) Len() int      { return len(b) }
func (b ByTimestamp) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b ByTimestamp) Less(i, j int) bool {
	return b[i].endTs < b[j].endTs
}

type Logger struct {
	// Directory to store logs into.
	dir string

	// Prefix all filenames with this.
	filePrefix string

	// MaxSize is the maximum size of commit log file in bytes,
	// before it gets rotated.
	// Sync every N logs. A value of zero or less would mean
	// sync every append to file.
	SyncEvery int

	sync.RWMutex
	list              []*logFile
	curFile           *os.File
	size              int64
	lastLogTs         int64
	logsSinceLastSync int
	ticker            *time.Ticker
}

func (l *Logger) resetCounters() {
	l.size = 0
	l.logsSinceLastSync = 0
}

func (l *Logger) periodicSync() {
	glog.WithField("dur", l.SyncDur).Debug("Periodic sync.")
	if l.SyncDur == 0 {
		glog.Info("No Periodic Sync for commit log.")
		return
	}

	l.ticker = time.NewTicker(l.SyncDur)
	for _ = range l.ticker.C {
		l.Lock()
		if l.curFile != nil && l.logsSinceLastSync > 0 {
			if err := l.curFile.Sync(); err != nil {
				glog.WithError(err).Error("While periodically syncing.")
			} else {
				l.logsSinceLastSync = 0
				glog.Debug("Successful periodic sync.")
			}
		} else {
			glog.Debug("Skipping periodic sync.")
		}
		l.Unlock()
	}
func (l *Logger) Close() {
	l.Lock()
	defer l.Unlock()

	if l.ticker != nil {
		l.ticker.Stop()
	}
	if l.curFile != nil {
		if err := l.curFile.Close(); err != nil {
			glog.WithError(err).Error("While closing current file.")
		}
	}
}

func NewLogger(dir string, fileprefix string, maxSize int64) *Logger {
	l := new(Logger)
	l.dir = dir
	l.filePrefix = fileprefix
	l.maxSize = maxSize
	return l
}

func (l *Logger) handleFile(path string, info os.FileInfo, err error) error {
	if info.IsDir() {
		return nil
	}
	if !strings.HasPrefix(info.Name(), l.filePrefix+"-") {
		return nil
	}
	if !strings.HasSuffix(info.Name(), ".log") {
		return nil
	}
	lidx := strings.LastIndex(info.Name(), ".log")
	tstring := info.Name()[len(l.filePrefix)+1 : lidx]
	glog.WithField("log_ts", tstring).Debug("Found log.")
		var err error
		l.size = info.Size()
		l.curFile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY, os.FileMode(0644))
		if err != nil {
			glog.WithError(err).Fatal("Unable to open file in write mode.")
		}
		/*
			ret, err := l.curFile.Seek(info.Size(), 0)
			if err != nil || ret != info.Size() {
				glog.WithError(err).Fatal("Unable to seek to end of file.")
			}
		*/
		l.lastLogTs, err = lastTimestamp(path)
		if err != nil {
			glog.WithError(err).Fatal("Unable to read last log timestamp.")
		}
	ts, err := strconv.ParseInt(tstring, 16, 64)
	if err != nil {
		return err
	}
	lf := new(logFile)
	lf.endTs = ts
	lf.path = path
	l.list = append(l.list, lf)
	return nil
}

func (l *Logger) Init() {
	if err := filepath.Walk(l.dir, l.handleFile); err != nil {
		glog.WithError(err).Fatal("While walking over directory")
	}
	sort.Sort(ByTimestamp(l.list))
	if l.curFile == nil {
		l.createNew()
	}
	go l.periodicSync()
}

func (l *Logger) filepath(ts int64) string {
	return fmt.Sprintf("%s-%s.log", l.filePrefix, strconv.FormatInt(ts, 16))
}

type Header struct {
	ts   int64
	hash uint32
	size int32
}

func parseHeader(hdr []byte) (Header, error) {
	buf := bytes.NewBuffer(hdr)
	var h Header
	var err error
	setError(&err, binary.Read(buf, binary.LittleEndian, &h.ts))
	setError(&err, binary.Read(buf, binary.LittleEndian, &h.hash))
	setError(&err, binary.Read(buf, binary.LittleEndian, &h.size))
	if err != nil {
		glog.WithError(err).Error("While parsing header.")
		return h, err
	}
	return h, nil
}

func lastTimestamp(path string) (int64, error) {
	f, err := os.Open(path)
	defer f.Close()

	if err != nil {
		return 0, err
	}

	reader := bufio.NewReaderSize(f, 2<<20)
	var maxTs int64
	header := make([]byte, 16)
	count := 0
	for {
		n, err := reader.Read(header)
		if err == io.EOF {
			break
		}
		if n < len(header) {
			glog.WithField("n", n).Fatal("Unable to read the full 16 byte header.")
		}
		if err != nil {
			glog.WithError(err).Error("While peeking into reader.")
			return 0, err
		}
		count += 1
		h, err := parseHeader(header)
		if err != nil {
			return 0, err
		}

		if h.ts > maxTs {
			maxTs = h.ts

		} else if h.ts < maxTs {
			glog.WithFields(logrus.Fields{
				"ts":         h.ts,
				"maxts":      maxTs,
				"path":       f.Name(),
				"numrecords": count,
			}).Fatal("Log file doesn't have monotonically increasing records.")
		}

		reader.Discard(int(h.size))
	}
	return maxTs, nil
}

func (l *Logger) rotateCurrent() error {
	if len(l.list) > 0 {
		// No need to acquire logFile lock.
		last := l.list[len(l.list)-1]
		if last.endTs > l.lastLogTs {
			return fmt.Errorf("Maxtimestamp is lower than existing commit logs.")
		}
	}

	newpath := filepath.Join(l.dir, l.filepath(l.lastLogTs))
	if err := l.curFile.Close(); err != nil {
		return err
	}
	if err := os.Rename(l.curFile.Name(), newpath); err != nil {
		glog.WithError(err).WithField("curfile", l.curFile.Name()).
			WithField("newfile", newpath).Error("While renaming.")
		return err
	}

	lf := new(logFile)
	lf.endTs = l.lastLogTs
	lf.path = newpath
	lf.size = l.size
	l.list = append(l.list, lf)
	return nil
}

// Expects a lock has already been acquired.
func (l *Logger) createNew() {
	path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
	if err := os.MkdirAll(l.dir, 0744); err != nil {
		glog.WithError(err).Fatal("Unable to create directory.")
	}
	f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC,
		os.FileMode(0644))
	if err != nil {
		glog.WithError(err).Fatal("Unable to create a new file.")
	}
	l.curFile = f
}

func setError(prev *error, n error) {
	if prev == nil {
		prev = &n
	}
	return
}

func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
	if ts < l.lastLogTs {
		return fmt.Errorf("Timestamp lower than last log timestamp.")
	}

	buf := new(bytes.Buffer)
	var err error
	setError(&err, binary.Write(buf, binary.LittleEndian, ts))
	setError(&err, binary.Write(buf, binary.LittleEndian, hash))
	setError(&err, binary.Write(buf, binary.LittleEndian, int32(len(value))))
	_, nerr := buf.Write(value)
	setError(&err, nerr)
	if err != nil {
		return err
	}
	glog.WithField("bytes", buf.Len()).WithField("ts", ts).
		Debug("Log entry buffer.")

	l.Lock()
	defer l.Unlock()

	if l.size+int64(buf.Len()) > l.maxSize {
		if err = l.rotateCurrent(); err != nil {
			glog.WithError(err).Error("While rotating current file out.")
			return err
		}
		l.createNew()
	}

	if l.curFile == nil {
		glog.Fatalf("Current file isn't initialized.")
	}

	if _, err = l.curFile.Write(buf.Bytes()); err != nil {
		glog.WithError(err).Error("While writing to current file.")
		return err
	}
	l.logsSinceLastSync += 1
	l.lastLogTs = ts
	if l.SyncEvery <= 0 || l.logsSinceLastSync >= l.SyncEvery {
		l.logsSinceLastSync = 0