diff --git a/commit/log.go b/commit/log.go index fbdbcecc798f3b2b11864554dce7e9c6da196b89..3c0f8889593a1c79e604735eeadd75002ec30fc9 100644 --- a/commit/log.go +++ b/commit/log.go @@ -23,14 +23,19 @@ package commit import ( - "container/list" + "bufio" + "bytes" + "encoding/binary" "fmt" + "io" "os" "path/filepath" + "sort" "strconv" "strings" "sync" + "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/x" ) @@ -38,9 +43,18 @@ var glog = x.Log("commitlog") type logFile struct { sync.RWMutex - startTs uint64 - f *os.File - size uint64 + endTs int64 + path string + f *os.File + size uint64 +} + +type ByTimestamp []*logFile + +func (b ByTimestamp) Len() int { return len(b) } +func (b ByTimestamp) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b ByTimestamp) Less(i, j int) bool { + return b[i].endTs < b[j].endTs } type Logger struct { @@ -54,8 +68,16 @@ type Logger struct { // before it gets rotated. maxSize uint64 - flistm sync.RWMutex - flist *list.List + // Sync every N logs. A value of zero or less would mean + // sync every append to file. + SyncEvery int + + sync.RWMutex + list []*logFile + curFile *os.File + size int64 + lastLogTs int64 + logsSinceLastSync int } func NewLogger(dir string, fileprefix string, maxSize uint64) *Logger { @@ -63,7 +85,6 @@ func NewLogger(dir string, fileprefix string, maxSize uint64) *Logger { l.dir = dir l.filePrefix = fileprefix l.maxSize = maxSize - l.flist = list.New() return l } @@ -80,18 +101,159 @@ func (l *Logger) handleFile(path string, info os.FileInfo, err error) error { lidx := strings.LastIndex(info.Name(), ".log") tstring := info.Name()[len(l.filePrefix)+1 : lidx] glog.WithField("log_ts", tstring).Debug("Found log.") + if tstring == "current" { + return nil + } + ts, err := strconv.ParseInt(tstring, 16, 64) + if err != nil { + return err + } + + lf := new(logFile) + lf.endTs = ts + lf.path = path + l.list = append(l.list, lf) + return nil } func (l *Logger) Init() { + l.Lock() + defer l.Unlock() + if err := filepath.Walk(l.dir, l.handleFile); err != nil { glog.WithError(err).Fatal("While walking over directory") } + sort.Sort(ByTimestamp(l.list)) + l.createNew() } func (l *Logger) filepath(ts int64) string { return fmt.Sprintf("%s-%s.log", l.filePrefix, strconv.FormatInt(ts, 16)) } -func (l *Logger) AddLog(ts uint64, hash uint32, value []byte) { +type Header struct { + ts int64 + hash uint32 + size int32 +} + +func parseHeader(hdr []byte) (Header, error) { + buf := bytes.NewBuffer(hdr) + var h Header + var err error + setError(&err, binary.Read(buf, binary.LittleEndian, &h.ts)) + setError(&err, binary.Read(buf, binary.LittleEndian, &h.hash)) + setError(&err, binary.Read(buf, binary.LittleEndian, &h.size)) + if err != nil { + glog.WithError(err).Error("While parsing header.") + return h, err + } + return h, nil +} + +func lastTimestamp(path string) (int64, error) { + f, err := os.Open(path) + defer f.Close() + + if err != nil { + return 0, err + } + + reader := bufio.NewReaderSize(f, 2<<20) + var maxTs int64 + header := make([]byte, 16) + count := 0 + for { + n, err := reader.Read(header) + if err == io.EOF { + break + } + if n < len(header) { + glog.WithField("n", n).Fatal("Unable to read the full 16 byte header.") + } + if err != nil { + glog.WithError(err).Error("While peeking into reader.") + return 0, err + } + count += 1 + h, err := parseHeader(header) + if err != nil { + return 0, err + } + + if h.ts > maxTs { + maxTs = h.ts + + } else if h.ts < maxTs { + glog.WithFields(logrus.Fields{ + "ts": h.ts, + "maxts": maxTs, + "path": f.Name(), + "numrecords": count, + }).Fatal("Log file doesn't have monotonically increasing records.") + } + + reader.Discard(int(h.size)) + } + return maxTs, nil +} + +// Expects a lock has already been acquired. +func (l *Logger) createNew() { + path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix)) + if err := os.MkdirAll(l.dir, 0744); err != nil { + glog.WithError(err).Fatal("Unable to create directory.") + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, + os.FileMode(0644)) + if err != nil { + glog.WithError(err).Fatal("Unable to create a new file.") + } + l.curFile = f + l.size = 0 +} + +func setError(prev *error, n error) { + if prev == nil { + prev = &n + } + return +} + +func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { + if ts < l.lastLogTs { + return fmt.Errorf("Timestamp lower than last log timestamp.") + } + + buf := new(bytes.Buffer) + var err error + setError(&err, binary.Write(buf, binary.LittleEndian, ts)) + setError(&err, binary.Write(buf, binary.LittleEndian, hash)) + setError(&err, binary.Write(buf, binary.LittleEndian, int32(len(value)))) + _, nerr := buf.Write(value) + setError(&err, nerr) + if err != nil { + return err + } + glog.WithField("bytes", buf.Len()).WithField("ts", ts). + Debug("Log entry buffer.") + + l.Lock() + defer l.Unlock() + if l.curFile == nil { + glog.Fatalf("Current file isn't initialized.") + } + + if _, err = l.curFile.Write(buf.Bytes()); err != nil { + glog.WithError(err).Error("While writing to current file.") + return err + } + l.logsSinceLastSync += 1 + l.lastLogTs = ts + if l.SyncEvery <= 0 || l.logsSinceLastSync >= l.SyncEvery { + l.logsSinceLastSync = 0 + return l.curFile.Sync() + } + return nil } diff --git a/commit/log_test.go b/commit/log_test.go index ad5e52274e4dcce816df953dc10a78ba7e55172b..5e342d0bd7b4e453508ccaf624cf05eb9855fc25 100644 --- a/commit/log_test.go +++ b/commit/log_test.go @@ -22,9 +22,13 @@ import ( "path/filepath" "testing" "time" + + "github.com/Sirupsen/logrus" ) func TestHandleFile(t *testing.T) { + logrus.SetLevel(logrus.DebugLevel) + dir, err := ioutil.TempDir("", "dgraph-log") if err != nil { t.Error(err) @@ -35,8 +39,7 @@ func TestHandleFile(t *testing.T) { l := NewLogger(dir, "dgraph", 50<<20) ts := time.Now().UnixNano() for i := 0; i < 10; i++ { - ts += 1 - fp := filepath.Join(dir, l.filepath(ts)) + fp := filepath.Join(dir, l.filepath(ts+int64(i))) if err := ioutil.WriteFile(fp, []byte("test calling"), os.ModeAppend); err != nil { t.Error(err) @@ -44,4 +47,40 @@ func TestHandleFile(t *testing.T) { } } l.Init() + for i, lf := range l.list { + exp := ts + int64(i) + if lf.endTs != exp { + t.Errorf("Expected %v. Got: %v", exp, lf.endTs) + } + } +} + +func TestAddLog(t *testing.T) { + dir, err := ioutil.TempDir("", "dgraph-log") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dir) + + l := NewLogger(dir, "dgraph", 50<<20) + l.Init() + + ts := time.Now().UnixNano() + for i := 0; i < 10; i++ { + curts := ts + int64(i) + if err := l.AddLog(curts, 0, []byte("hey")); err != nil { + t.Error(err) + return + } + } + + glog.Debugf("Test curfile path: %v", l.curFile.Name()) + last, err := lastTimestamp(l.curFile.Name()) + if err != nil { + t.Error(err) + } + if last != ts+9 { + t.Errorf("Expected %v. Got: %v\n", ts+9, last) + } }