Newer
Older
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// commit package provides commit logs for storing mutations, as they arrive
// at the server. Mutations also get stored in memory within posting.List.
// So, commit logs are useful to handle machine crashes, and re-init of a
// posting list.
// This package provides functionality to write to a rotating log, and a way
// to quickly filter relevant entries corresponding to an attribute.
package commit
import (
"bufio"
"bytes"
"encoding/binary"
"strconv"
"strings"
"sync"
Manish R Jain
committed
"time"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/x"
)
var glog = x.Log("commitlog")
type logFile struct {
sync.RWMutex
Manish R Jain
committed
endTs int64 // never modified after creation.
path string
f *os.File
Manish R Jain
committed
size int64
}
type ByTimestamp []*logFile
func (b ByTimestamp) Len() int { return len(b) }
func (b ByTimestamp) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b ByTimestamp) Less(i, j int) bool {
return b[i].endTs < b[j].endTs
}
type Logger struct {
// Directory to store logs into.
dir string
// Prefix all filenames with this.
filePrefix string
// MaxSize is the maximum size of commit log file in bytes,
// before it gets rotated.
Manish R Jain
committed
maxSize int64
// Sync every N logs. A value of zero or less would mean
// sync every append to file.
SyncEvery int
Manish R Jain
committed
// Sync every d duration.
SyncDur time.Duration
sync.RWMutex
list []*logFile
curFile *os.File
size int64
lastLogTs int64
logsSinceLastSync int
Manish R Jain
committed
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
ticker *time.Ticker
}
func (l *Logger) resetCounters() {
l.size = 0
l.logsSinceLastSync = 0
}
func (l *Logger) periodicSync() {
glog.WithField("dur", l.SyncDur).Debug("Periodic sync.")
if l.SyncDur == 0 {
glog.Info("No Periodic Sync for commit log.")
return
}
l.ticker = time.NewTicker(l.SyncDur)
for _ = range l.ticker.C {
l.Lock()
if l.curFile != nil && l.logsSinceLastSync > 0 {
if err := l.curFile.Sync(); err != nil {
glog.WithError(err).Error("While periodically syncing.")
} else {
l.logsSinceLastSync = 0
glog.Debug("Successful periodic sync.")
}
} else {
glog.Debug("Skipping periodic sync.")
}
l.Unlock()
}
Manish R Jain
committed
func (l *Logger) Close() {
l.Lock()
defer l.Unlock()
if l.ticker != nil {
l.ticker.Stop()
}
if l.curFile != nil {
if err := l.curFile.Close(); err != nil {
glog.WithError(err).Error("While closing current file.")
}
}
}
func NewLogger(dir string, fileprefix string, maxSize int64) *Logger {
l := new(Logger)
l.dir = dir
l.filePrefix = fileprefix
l.maxSize = maxSize
return l
}
func (l *Logger) handleFile(path string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
if !strings.HasPrefix(info.Name(), l.filePrefix+"-") {
return nil
}
if !strings.HasSuffix(info.Name(), ".log") {
return nil
}
lidx := strings.LastIndex(info.Name(), ".log")
tstring := info.Name()[len(l.filePrefix)+1 : lidx]
glog.WithField("log_ts", tstring).Debug("Found log.")
Manish R Jain
committed
// Handle if we find the current log file.
if tstring == "current" {
return nil
}
Manish R Jain
committed
ts, err := strconv.ParseInt(tstring, 16, 64)
if err != nil {
return err
}
lf := new(logFile)
lf.endTs = ts
lf.path = path
l.list = append(l.list, lf)
return nil
}
func (l *Logger) Init() {
l.Lock()
defer l.Unlock()
{
// First check if we have a current file.
path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
fi, err := os.Stat(path)
if err == nil {
// we have the file. Derive information for counters.
l.size = fi.Size()
l.logsSinceLastSync = 0
l.lastLogTs, err = lastTimestamp(path)
if err != nil {
glog.WithError(err).Fatal("Unable to read last log timestamp.")
}
// Open file for append.
l.curFile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY,
os.FileMode(0644))
if err != nil {
glog.WithError(err).Fatal("Unable to open current file in append mode.")
}
}
}
if err := filepath.Walk(l.dir, l.handleFile); err != nil {
glog.WithError(err).Fatal("While walking over directory")
}
sort.Sort(ByTimestamp(l.list))
Manish R Jain
committed
if l.curFile == nil {
l.createNew()
}
go l.periodicSync()
}
func (l *Logger) filepath(ts int64) string {
return fmt.Sprintf("%s-%s.log", l.filePrefix, strconv.FormatInt(ts, 16))
}
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
type Header struct {
ts int64
hash uint32
size int32
}
func parseHeader(hdr []byte) (Header, error) {
buf := bytes.NewBuffer(hdr)
var h Header
var err error
setError(&err, binary.Read(buf, binary.LittleEndian, &h.ts))
setError(&err, binary.Read(buf, binary.LittleEndian, &h.hash))
setError(&err, binary.Read(buf, binary.LittleEndian, &h.size))
if err != nil {
glog.WithError(err).Error("While parsing header.")
return h, err
}
return h, nil
}
func lastTimestamp(path string) (int64, error) {
f, err := os.Open(path)
defer f.Close()
if err != nil {
return 0, err
}
reader := bufio.NewReaderSize(f, 2<<20)
var maxTs int64
header := make([]byte, 16)
count := 0
for {
n, err := reader.Read(header)
if err == io.EOF {
break
}
if n < len(header) {
glog.WithField("n", n).Fatal("Unable to read the full 16 byte header.")
}
if err != nil {
glog.WithError(err).Error("While peeking into reader.")
return 0, err
}
count += 1
h, err := parseHeader(header)
if err != nil {
return 0, err
}
if h.ts > maxTs {
maxTs = h.ts
} else if h.ts < maxTs {
glog.WithFields(logrus.Fields{
"ts": h.ts,
"maxts": maxTs,
"path": f.Name(),
"numrecords": count,
}).Fatal("Log file doesn't have monotonically increasing records.")
}
reader.Discard(int(h.size))
}
return maxTs, nil
}
Manish R Jain
committed
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
func (l *Logger) rotateCurrent() error {
if len(l.list) > 0 {
// No need to acquire logFile lock.
last := l.list[len(l.list)-1]
if last.endTs > l.lastLogTs {
return fmt.Errorf("Maxtimestamp is lower than existing commit logs.")
}
}
newpath := filepath.Join(l.dir, l.filepath(l.lastLogTs))
if err := l.curFile.Close(); err != nil {
return err
}
if err := os.Rename(l.curFile.Name(), newpath); err != nil {
glog.WithError(err).WithField("curfile", l.curFile.Name()).
WithField("newfile", newpath).Error("While renaming.")
return err
}
lf := new(logFile)
lf.endTs = l.lastLogTs
lf.path = newpath
lf.size = l.size
l.list = append(l.list, lf)
return nil
}
// Expects a lock has already been acquired.
func (l *Logger) createNew() {
path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
if err := os.MkdirAll(l.dir, 0744); err != nil {
glog.WithError(err).Fatal("Unable to create directory.")
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC,
os.FileMode(0644))
if err != nil {
glog.WithError(err).Fatal("Unable to create a new file.")
}
l.curFile = f
Manish R Jain
committed
l.resetCounters()
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
}
func setError(prev *error, n error) {
if prev == nil {
prev = &n
}
return
}
func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
if ts < l.lastLogTs {
return fmt.Errorf("Timestamp lower than last log timestamp.")
}
buf := new(bytes.Buffer)
var err error
setError(&err, binary.Write(buf, binary.LittleEndian, ts))
setError(&err, binary.Write(buf, binary.LittleEndian, hash))
setError(&err, binary.Write(buf, binary.LittleEndian, int32(len(value))))
_, nerr := buf.Write(value)
setError(&err, nerr)
if err != nil {
return err
}
glog.WithField("bytes", buf.Len()).WithField("ts", ts).
Debug("Log entry buffer.")
l.Lock()
defer l.Unlock()
Manish R Jain
committed
if l.size+int64(buf.Len()) > l.maxSize {
if err = l.rotateCurrent(); err != nil {
glog.WithError(err).Error("While rotating current file out.")
return err
}
l.createNew()
}
if l.curFile == nil {
glog.Fatalf("Current file isn't initialized.")
}
if _, err = l.curFile.Write(buf.Bytes()); err != nil {
glog.WithError(err).Error("While writing to current file.")
return err
}
l.logsSinceLastSync += 1
l.lastLogTs = ts
Manish R Jain
committed
l.size += int64(buf.Len())
if l.SyncEvery <= 0 || l.logsSinceLastSync >= l.SyncEvery {
l.logsSinceLastSync = 0
Manish R Jain
committed
glog.Debug("Syncing file")
return l.curFile.Sync()
}
return nil