diff --git a/commit/log.go b/commit/log.go index 7d3ef0eac9b9c04b04796cad6b2c8c118330280e..ecc329bdebd4ced78c30f76e35795a03c9456668 100644 --- a/commit/log.go +++ b/commit/log.go @@ -46,7 +46,6 @@ type logFile struct { sync.RWMutex endTs int64 // never modified after creation. path string - f *os.File size int64 } @@ -247,7 +246,7 @@ func lastTimestamp(path string) (int64, error) { glog.WithField("n", n).Fatal("Unable to read the full 16 byte header.") } if err != nil { - glog.WithError(err).Error("While peeking into reader.") + glog.WithError(err).Error("While reading header.") return 0, err } count += 1 @@ -369,3 +368,80 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { } return nil } + +func streamEntriesInFile(path string, + afterTs int64, hash uint32, ch chan []byte) error { + + flog := glog.WithField("path", path) + f, err := os.Open(path) + if err != nil { + flog.WithError(err).Error("While opening file.") + return err + } + defer f.Close() + + reader := bufio.NewReaderSize(f, 5<<20) + header := make([]byte, 16) + for { + n, err := reader.Read(header) + if err == io.EOF { + flog.Debug("File read complete.") + break + } + if n != len(header) { + flog.WithField("n", n).Fatal("Unable to read header.") + } + if err != nil { + flog.WithError(err).Error("While reading header.") + return err + } + hdr, err := parseHeader(header) + if err != nil { + flog.WithError(err).Error("While parsing header.") + return err + } + if hdr.hash == hash && hdr.ts >= afterTs { + data := make([]byte, hdr.size) + n, err := reader.Read(data) + if err != nil { + flog.WithError(err).Error("While reading data.") + return err + } + if int32(n) != hdr.size { + flog.WithField("n", n).Fatal("Unable to read data.") + } + ch <- data + + } else { + reader.Discard(int(hdr.size)) + } + } + return nil +} + +func (l *Logger) StreamEntries(afterTs int64, hash uint32, + ch chan []byte, done chan error) { + + var paths []string + l.Lock() + for _, lf := range l.list { + if afterTs < lf.endTs { + paths = append(paths, lf.path) + } + } + l.Unlock() + + { + cur := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix)) + paths = append(paths, cur) + } + for _, path := range paths { + if err := streamEntriesInFile(path, afterTs, hash, ch); err != nil { + close(ch) + done <- err + return + } + } + close(ch) + done <- nil +} diff --git a/commit/log_test.go b/commit/log_test.go index 0c76d30a3e65281b33fa409dd1f7418f03f6f21b..49145a006065d130f9dea4308b2ff65c62160a25 100644 --- a/commit/log_test.go +++ b/commit/log_test.go @@ -17,6 +17,7 @@ package commit import ( + "bytes" "io/ioutil" "os" "path/filepath" @@ -141,6 +142,9 @@ func TestRotatingLog(t *testing.T) { if nl.size != 416 { t.Errorf("Expected size 416. Got: %v", nl.size) } + if err := nl.AddLog(ts, 0, data); err == nil { + t.Error("Adding an entry with older ts should fail.") + } if err := nl.AddLog(ts+int64(100), 0, data); err != nil { t.Error(err) return @@ -165,3 +169,87 @@ func TestRotatingLog(t *testing.T) { t.Errorf("Expected last log ts: %v. Got: %v", ts+int64(113), nl.lastLogTs) } } + +func TestReadEntries(t *testing.T) { + dir, err := ioutil.TempDir("", "dgraph-log") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dir) + + l := NewLogger(dir, "dgraph", 1024) // 1 kB + l.SyncDur = 0 + l.SyncEvery = 0 + l.Init() + defer l.Close() + + data := make([]byte, 400) + ts := time.Now().UnixNano() + for i := 0; i < 9; i++ { + curts := ts + int64(i) + if err := l.AddLog(curts, uint32(i%3), data); err != nil { + t.Error(err) + return + } + } + // This should have created 4 files of 832 bytes each (header + data), and + // the current file should be of size 416. + if len(l.list) != 4 { + t.Errorf("Expected 4 files. Got: %v", len(l.list)) + } + for i, lf := range l.list { + exp := ts + int64(2*i+1) + if lf.endTs != exp { + t.Errorf("Expected end ts: %v. Got: %v", exp, lf.endTs) + } + } + if l.size != 416 { + t.Errorf("Expected size 416. Got: %v", l.size) + } + if l.lastLogTs != ts+int64(8) { + t.Errorf("Expected ts: %v. Got: %v", ts+int64(8), l.lastLogTs) + } + + { + // Check for hash = 1, ts >= 2. + ch := make(chan []byte, 10) + done := make(chan error) + go l.StreamEntries(ts+2, uint32(1), ch, done) + count := 0 + for val := range ch { + count += 1 + if bytes.Compare(data, val) != 0 { + t.Error("Data doesn't equate.") + } + } + if err := <-done; err != nil { + t.Error(err) + } + if count != 2 { + t.Errorf("Expected 2 entries. Got: %v", count) + } + } + { + if err := l.AddLog(ts+int64(9), 1, data); err != nil { + t.Error(err) + } + // Check for hash = 1, ts >= 2. + ch := make(chan []byte, 10) + done := make(chan error) + go l.StreamEntries(ts, uint32(1), ch, done) + count := 0 + for val := range ch { + count += 1 + if bytes.Compare(data, val) != 0 { + t.Error("Data doesn't equate.") + } + } + if err := <-done; err != nil { + t.Error(err) + } + if count != 4 { + t.Errorf("Expected 4 entries. Got: %v", count) + } + } +}