Skip to content
Snippets Groups Projects
Commit 24de201f authored by Manish R Jain's avatar Manish R Jain
Browse files

Ability to stream entries >= min timestamp with a given hash.

parent ff5374de
No related branches found
No related tags found
No related merge requests found
...@@ -46,7 +46,6 @@ type logFile struct { ...@@ -46,7 +46,6 @@ type logFile struct {
sync.RWMutex sync.RWMutex
endTs int64 // never modified after creation. endTs int64 // never modified after creation.
path string path string
f *os.File
size int64 size int64
} }
...@@ -247,7 +246,7 @@ func lastTimestamp(path string) (int64, error) { ...@@ -247,7 +246,7 @@ func lastTimestamp(path string) (int64, error) {
glog.WithField("n", n).Fatal("Unable to read the full 16 byte header.") glog.WithField("n", n).Fatal("Unable to read the full 16 byte header.")
} }
if err != nil { if err != nil {
glog.WithError(err).Error("While peeking into reader.") glog.WithError(err).Error("While reading header.")
return 0, err return 0, err
} }
count += 1 count += 1
...@@ -369,3 +368,80 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { ...@@ -369,3 +368,80 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
} }
return nil return nil
} }
func streamEntriesInFile(path string,
afterTs int64, hash uint32, ch chan []byte) error {
flog := glog.WithField("path", path)
f, err := os.Open(path)
if err != nil {
flog.WithError(err).Error("While opening file.")
return err
}
defer f.Close()
reader := bufio.NewReaderSize(f, 5<<20)
header := make([]byte, 16)
for {
n, err := reader.Read(header)
if err == io.EOF {
flog.Debug("File read complete.")
break
}
if n != len(header) {
flog.WithField("n", n).Fatal("Unable to read header.")
}
if err != nil {
flog.WithError(err).Error("While reading header.")
return err
}
hdr, err := parseHeader(header)
if err != nil {
flog.WithError(err).Error("While parsing header.")
return err
}
if hdr.hash == hash && hdr.ts >= afterTs {
data := make([]byte, hdr.size)
n, err := reader.Read(data)
if err != nil {
flog.WithError(err).Error("While reading data.")
return err
}
if int32(n) != hdr.size {
flog.WithField("n", n).Fatal("Unable to read data.")
}
ch <- data
} else {
reader.Discard(int(hdr.size))
}
}
return nil
}
func (l *Logger) StreamEntries(afterTs int64, hash uint32,
ch chan []byte, done chan error) {
var paths []string
l.Lock()
for _, lf := range l.list {
if afterTs < lf.endTs {
paths = append(paths, lf.path)
}
}
l.Unlock()
{
cur := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
paths = append(paths, cur)
}
for _, path := range paths {
if err := streamEntriesInFile(path, afterTs, hash, ch); err != nil {
close(ch)
done <- err
return
}
}
close(ch)
done <- nil
}
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package commit package commit
import ( import (
"bytes"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
...@@ -141,6 +142,9 @@ func TestRotatingLog(t *testing.T) { ...@@ -141,6 +142,9 @@ func TestRotatingLog(t *testing.T) {
if nl.size != 416 { if nl.size != 416 {
t.Errorf("Expected size 416. Got: %v", nl.size) t.Errorf("Expected size 416. Got: %v", nl.size)
} }
if err := nl.AddLog(ts, 0, data); err == nil {
t.Error("Adding an entry with older ts should fail.")
}
if err := nl.AddLog(ts+int64(100), 0, data); err != nil { if err := nl.AddLog(ts+int64(100), 0, data); err != nil {
t.Error(err) t.Error(err)
return return
...@@ -165,3 +169,87 @@ func TestRotatingLog(t *testing.T) { ...@@ -165,3 +169,87 @@ func TestRotatingLog(t *testing.T) {
t.Errorf("Expected last log ts: %v. Got: %v", ts+int64(113), nl.lastLogTs) t.Errorf("Expected last log ts: %v. Got: %v", ts+int64(113), nl.lastLogTs)
} }
} }
func TestReadEntries(t *testing.T) {
dir, err := ioutil.TempDir("", "dgraph-log")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir)
l := NewLogger(dir, "dgraph", 1024) // 1 kB
l.SyncDur = 0
l.SyncEvery = 0
l.Init()
defer l.Close()
data := make([]byte, 400)
ts := time.Now().UnixNano()
for i := 0; i < 9; i++ {
curts := ts + int64(i)
if err := l.AddLog(curts, uint32(i%3), data); err != nil {
t.Error(err)
return
}
}
// This should have created 4 files of 832 bytes each (header + data), and
// the current file should be of size 416.
if len(l.list) != 4 {
t.Errorf("Expected 4 files. Got: %v", len(l.list))
}
for i, lf := range l.list {
exp := ts + int64(2*i+1)
if lf.endTs != exp {
t.Errorf("Expected end ts: %v. Got: %v", exp, lf.endTs)
}
}
if l.size != 416 {
t.Errorf("Expected size 416. Got: %v", l.size)
}
if l.lastLogTs != ts+int64(8) {
t.Errorf("Expected ts: %v. Got: %v", ts+int64(8), l.lastLogTs)
}
{
// Check for hash = 1, ts >= 2.
ch := make(chan []byte, 10)
done := make(chan error)
go l.StreamEntries(ts+2, uint32(1), ch, done)
count := 0
for val := range ch {
count += 1
if bytes.Compare(data, val) != 0 {
t.Error("Data doesn't equate.")
}
}
if err := <-done; err != nil {
t.Error(err)
}
if count != 2 {
t.Errorf("Expected 2 entries. Got: %v", count)
}
}
{
if err := l.AddLog(ts+int64(9), 1, data); err != nil {
t.Error(err)
}
// Check for hash = 1, ts >= 2.
ch := make(chan []byte, 10)
done := make(chan error)
go l.StreamEntries(ts, uint32(1), ch, done)
count := 0
for val := range ch {
count += 1
if bytes.Compare(data, val) != 0 {
t.Error("Data doesn't equate.")
}
}
if err := <-done; err != nil {
t.Error(err)
}
if count != 4 {
t.Errorf("Expected 4 entries. Got: %v", count)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment