Skip to content
Snippets Groups Projects
Commit 06701f5f authored by Manish R Jain's avatar Manish R Jain
Browse files

Use cache for streaming entries instead of opening files for each posting...

Use cache for streaming entries instead of opening files for each posting list. Use iterator instead of channel while streaming entries.
parent 7e5ea205
No related branches found
No related tags found
No related merge requests found
......@@ -21,16 +21,22 @@ import (
"io"
"io/ioutil"
"sync"
"sync/atomic"
"time"
)
var E_READ = errors.New("Unable to read")
var E_WRITE = errors.New("Unable to write")
type Cache struct {
sync.RWMutex
buf []byte
buf []byte
lastAccess int64
}
func (c *Cache) Write(p []byte) (n int, err error) {
atomic.StoreInt64(&c.lastAccess, time.Now().UnixNano())
c.Lock()
defer c.Unlock()
c.buf = append(c.buf, p...)
......@@ -38,6 +44,8 @@ func (c *Cache) Write(p []byte) (n int, err error) {
}
func (c *Cache) ReadAt(pos int, p []byte) (n int, err error) {
atomic.StoreInt64(&c.lastAccess, time.Now().UnixNano())
c.RLock()
defer c.RUnlock()
......@@ -46,9 +54,17 @@ func (c *Cache) ReadAt(pos int, p []byte) (n int, err error) {
}
n = copy(p, c.buf[pos:])
if n < len(p) {
return n, E_READ
}
return n, nil
}
func (c *Cache) LastAccessedInSeconds() int64 {
d := atomic.LoadInt64(&c.lastAccess)
return (time.Now().UnixNano() - d) / 1000000000
}
// Reader isn't thread-safe. But multiple readers can be used to read the
// same cache.
type Reader struct {
......
......@@ -23,13 +23,13 @@
package commit
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"os"
"path/filepath"
"runtime/debug"
"sort"
"strconv"
"strings"
......@@ -52,6 +52,25 @@ type logFile struct {
cache *Cache
}
func (lf *logFile) Cache() *Cache {
lf.RLock()
defer lf.RUnlock()
return lf.cache
}
func (lf *logFile) FillIfEmpty(wg *sync.WaitGroup) {
lf.Lock()
defer lf.Unlock()
defer wg.Done()
if lf.cache != nil {
return
}
lf.cache = new(Cache)
if err := FillCache(lf.cache, lf.path); err != nil {
glog.WithError(err).WithField("path", lf.path).Fatal("Unable to fill cache.")
}
}
type CurFile struct {
sync.RWMutex
f *os.File
......@@ -61,6 +80,11 @@ type CurFile struct {
}
func (c *CurFile) cache() *Cache {
if c == nil {
debug.PrintStack()
// This got triggered due to a premature cleanup in query_test.go
}
v := atomic.LoadPointer(&c.cch)
if v == nil {
return nil
......@@ -137,6 +161,19 @@ func (l *Logger) updateLastLogTs(val int64) {
}
}
func (l *Logger) DeleteCacheOlderThan(v time.Duration) {
l.RLock()
defer l.RUnlock()
s := int64(v.Seconds())
for _, lf := range l.list {
if lf.Cache().LastAccessedInSeconds() > s {
lf.Lock()
lf.cache = nil
lf.Unlock()
}
}
}
func (l *Logger) periodicSync() {
glog.WithField("dur", l.SyncDur).Debug("Periodic sync.")
if l.SyncDur == 0 {
......@@ -447,87 +484,82 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
return nil
}
func streamEntriesInFile(path string,
afterTs int64, hash uint32, ch chan []byte) error {
flog := glog.WithField("path", path)
f, err := os.Open(path)
if err != nil {
flog.WithError(err).Error("While opening file.")
return err
}
defer f.Close()
func streamEntries(cache *Cache,
afterTs int64, hash uint32, iter LogIterator) error {
discard := make([]byte, 4096)
reader := bufio.NewReaderSize(f, 5<<20)
flog := glog
reader := NewReader(cache)
header := make([]byte, 16)
for {
n, err := reader.Read(header)
_, err := reader.Read(header)
if err == io.EOF {
flog.Debug("File read complete.")
flog.Debug("Cache read complete.")
break
}
if n != len(header) {
flog.WithField("n", n).Fatal("Unable to read header.")
}
if err != nil {
flog.WithError(err).Error("While reading header.")
flog.WithError(err).Fatal("While reading header.")
return err
}
hdr, err := parseHeader(header)
if err != nil {
flog.WithError(err).Error("While parsing header.")
return err
}
if hdr.hash == hash && hdr.ts >= afterTs {
data := make([]byte, hdr.size)
n, err := reader.Read(data)
_, err := reader.Read(data)
if err != nil {
flog.WithError(err).Error("While reading data.")
flog.WithError(err).Fatal("While reading data.")
return err
}
if int32(n) != hdr.size {
flog.WithField("n", n).Fatal("Unable to read data.")
}
ch <- data
iter(data)
} else {
for int(hdr.size) > len(discard) {
discard = make([]byte, len(discard)*2)
}
reader.Read(discard[:int(hdr.size)])
reader.Discard(int(hdr.size))
}
}
return nil
}
type LogIterator func(record []byte)
// Always run this method in it's own goroutine. Otherwise, your program
// will just hang waiting on channels.
func (l *Logger) StreamEntries(afterTs int64, hash uint32,
ch chan []byte, done chan error) {
iter LogIterator) error {
var paths []string
var wg sync.WaitGroup
l.RLock()
for _, lf := range l.list {
if afterTs < lf.endTs {
paths = append(paths, lf.path)
wg.Add(1)
go lf.FillIfEmpty(&wg)
}
}
l.RUnlock()
wg.Wait()
{
cur := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
if _, err := os.Stat(cur); err == nil {
paths = append(paths, cur)
l.RLock()
var caches []*Cache
for _, lf := range l.list {
if afterTs < lf.endTs {
caches = append(caches, lf.Cache())
}
}
for _, path := range paths {
if err := streamEntriesInFile(path, afterTs, hash, ch); err != nil {
close(ch)
done <- err
return
l.RUnlock()
// Add current cache.
caches = append(caches, l.curFile().cache())
for _, cache := range caches {
if cache == nil {
glog.Error("Cache is nil")
continue
}
if err := streamEntries(cache, afterTs, hash, iter); err != nil {
return err
}
}
close(ch)
done <- nil
return nil
}
......@@ -211,17 +211,14 @@ func TestReadEntries(t *testing.T) {
{
// Check for hash = 1, ts >= 2.
ch := make(chan []byte, 10)
done := make(chan error)
go l.StreamEntries(ts+2, uint32(1), ch, done)
count := 0
for val := range ch {
err := l.StreamEntries(ts+2, uint32(1), func(entry []byte) {
count += 1
if bytes.Compare(data, val) != 0 {
if bytes.Compare(data, entry) != 0 {
t.Error("Data doesn't equate.")
}
}
if err := <-done; err != nil {
})
if err != nil {
t.Error(err)
}
if count != 2 {
......@@ -233,17 +230,14 @@ func TestReadEntries(t *testing.T) {
t.Error(err)
}
// Check for hash = 1, ts >= 2.
ch := make(chan []byte, 10)
done := make(chan error)
go l.StreamEntries(ts, uint32(1), ch, done)
count := 0
for val := range ch {
err := l.StreamEntries(ts, uint32(1), func(entry []byte) {
count += 1
if bytes.Compare(data, val) != 0 {
if bytes.Compare(data, entry) != 0 {
t.Error("Data doesn't equate.")
}
}
if err := <-done; err != nil {
})
if err != nil {
t.Error(err)
}
if count != 4 {
......
......@@ -20,7 +20,6 @@ import (
"fmt"
"testing"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/query"
)
......@@ -77,7 +76,7 @@ func TestParse(t *testing.T) {
}
func TestParseXid(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
// logrus.SetLevel(logrus.DebugLevel)
query := `
query {
user(_uid_: 0x11) {
......
......@@ -242,12 +242,8 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
l.hash = farm.Fingerprint32(key)
l.mlayer = make(map[int]types.Posting)
ch := make(chan []byte, 100)
done := make(chan error)
glog.Debug("Starting stream entries...")
go clog.StreamEntries(posting.CommitTs()+1, l.hash, ch, done)
for buffer := range ch {
err := clog.StreamEntries(posting.CommitTs()+1, l.hash, func(buffer []byte) {
uo := flatbuffers.GetUOffsetT(buffer)
m := new(types.Posting)
m.Init(buffer, uo)
......@@ -260,8 +256,8 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
"ts": m.Ts(),
}).Debug("Got entry from log")
l.mergeMutation(m)
}
if err := <-done; err != nil {
})
if err != nil {
glog.WithError(err).Error("While streaming entries.")
}
glog.Debug("Done streaming entries.")
......
......@@ -23,7 +23,6 @@ import (
"testing"
"time"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
......@@ -190,21 +189,19 @@ func TestNewGraph(t *testing.T) {
}
}
func populateGraph(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
func populateGraph(t *testing.T) string {
// logrus.SetLevel(logrus.DebugLevel)
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
return
return ""
}
defer os.RemoveAll(dir)
ps := new(store.Store)
ps.Init(dir)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
defer clog.Close()
posting.Init(ps, clog)
// So, user we're interested in has uid: 1.
......@@ -250,10 +247,13 @@ func populateGraph(t *testing.T) {
edge.Value = "Andrea"
addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name")))
return dir
}
func TestProcessGraph(t *testing.T) {
populateGraph(t)
dir := populateGraph(t)
defer os.RemoveAll(dir)
// Alright. Now we have everything set up. Let's create the query.
sg, err := NewGraph(1, "")
......@@ -346,7 +346,8 @@ func TestProcessGraph(t *testing.T) {
}
func TestToJson(t *testing.T) {
populateGraph(t)
dir := populateGraph(t)
defer os.RemoveAll(dir)
// Alright. Now we have everything set up. Let's create the query.
sg, err := NewGraph(1, "")
......@@ -379,7 +380,8 @@ func TestToJson(t *testing.T) {
t.Error(err)
}
js, err := sg.ToJson()
var l Latency
js, err := sg.ToJson(&l)
if err != nil {
t.Error(err)
}
......
......@@ -72,7 +72,7 @@ func main() {
defer ps.Close()
clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
clog.SkipWrite = true // Don't write to commit logs.
clog.SetSkipWrite(true) // Don't write to commit logs.
clog.Init()
defer clog.Close()
posting.Init(ps, clog)
......
......@@ -107,7 +107,7 @@ func main() {
defer ps.Close()
clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
clog.SkipWrite = false
clog.SetSkipWrite(false)
clog.SyncEvery = 1
clog.Init()
defer clog.Close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment