diff --git a/commit/cache.go b/commit/cache.go index 2a05d5d4ce556b7b419d700be47c481d77dd34ba..6fdbe7469aa36dea19b604bfd6b0832012c8a2bb 100644 --- a/commit/cache.go +++ b/commit/cache.go @@ -21,16 +21,22 @@ import ( "io" "io/ioutil" "sync" + "sync/atomic" + "time" ) +var E_READ = errors.New("Unable to read") var E_WRITE = errors.New("Unable to write") type Cache struct { sync.RWMutex - buf []byte + buf []byte + lastAccess int64 } func (c *Cache) Write(p []byte) (n int, err error) { + atomic.StoreInt64(&c.lastAccess, time.Now().UnixNano()) + c.Lock() defer c.Unlock() c.buf = append(c.buf, p...) @@ -38,6 +44,8 @@ func (c *Cache) Write(p []byte) (n int, err error) { } func (c *Cache) ReadAt(pos int, p []byte) (n int, err error) { + atomic.StoreInt64(&c.lastAccess, time.Now().UnixNano()) + c.RLock() defer c.RUnlock() @@ -46,9 +54,17 @@ func (c *Cache) ReadAt(pos int, p []byte) (n int, err error) { } n = copy(p, c.buf[pos:]) + if n < len(p) { + return n, E_READ + } return n, nil } +func (c *Cache) LastAccessedInSeconds() int64 { + d := atomic.LoadInt64(&c.lastAccess) + return (time.Now().UnixNano() - d) / 1000000000 +} + // Reader isn't thread-safe. But multiple readers can be used to read the // same cache. type Reader struct { diff --git a/commit/log.go b/commit/log.go index ba1b75d7586397537a61fa55ceca29ce97baf98c..30583308b2912f0a772dd408d05da438a7fe4214 100644 --- a/commit/log.go +++ b/commit/log.go @@ -23,13 +23,13 @@ package commit import ( - "bufio" "bytes" "encoding/binary" "fmt" "io" "os" "path/filepath" + "runtime/debug" "sort" "strconv" "strings" @@ -52,6 +52,25 @@ type logFile struct { cache *Cache } +func (lf *logFile) Cache() *Cache { + lf.RLock() + defer lf.RUnlock() + return lf.cache +} + +func (lf *logFile) FillIfEmpty(wg *sync.WaitGroup) { + lf.Lock() + defer lf.Unlock() + defer wg.Done() + if lf.cache != nil { + return + } + lf.cache = new(Cache) + if err := FillCache(lf.cache, lf.path); err != nil { + glog.WithError(err).WithField("path", lf.path).Fatal("Unable to fill cache.") + } +} + type CurFile struct { sync.RWMutex f *os.File @@ -61,6 +80,11 @@ type CurFile struct { } func (c *CurFile) cache() *Cache { + if c == nil { + debug.PrintStack() + // This got triggered due to a premature cleanup in query_test.go + } + v := atomic.LoadPointer(&c.cch) if v == nil { return nil @@ -137,6 +161,19 @@ func (l *Logger) updateLastLogTs(val int64) { } } +func (l *Logger) DeleteCacheOlderThan(v time.Duration) { + l.RLock() + defer l.RUnlock() + s := int64(v.Seconds()) + for _, lf := range l.list { + if lf.Cache().LastAccessedInSeconds() > s { + lf.Lock() + lf.cache = nil + lf.Unlock() + } + } +} + func (l *Logger) periodicSync() { glog.WithField("dur", l.SyncDur).Debug("Periodic sync.") if l.SyncDur == 0 { @@ -447,87 +484,82 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { return nil } -func streamEntriesInFile(path string, - afterTs int64, hash uint32, ch chan []byte) error { - - flog := glog.WithField("path", path) - f, err := os.Open(path) - if err != nil { - flog.WithError(err).Error("While opening file.") - return err - } - defer f.Close() +func streamEntries(cache *Cache, + afterTs int64, hash uint32, iter LogIterator) error { - discard := make([]byte, 4096) - reader := bufio.NewReaderSize(f, 5<<20) + flog := glog + reader := NewReader(cache) header := make([]byte, 16) for { - n, err := reader.Read(header) + _, err := reader.Read(header) if err == io.EOF { - flog.Debug("File read complete.") + flog.Debug("Cache read complete.") break } - if n != len(header) { - flog.WithField("n", n).Fatal("Unable to read header.") - } if err != nil { - flog.WithError(err).Error("While reading header.") + flog.WithError(err).Fatal("While reading header.") return err } + hdr, err := parseHeader(header) if err != nil { flog.WithError(err).Error("While parsing header.") return err } + if hdr.hash == hash && hdr.ts >= afterTs { data := make([]byte, hdr.size) - n, err := reader.Read(data) + _, err := reader.Read(data) if err != nil { - flog.WithError(err).Error("While reading data.") + flog.WithError(err).Fatal("While reading data.") return err } - if int32(n) != hdr.size { - flog.WithField("n", n).Fatal("Unable to read data.") - } - ch <- data + iter(data) } else { - for int(hdr.size) > len(discard) { - discard = make([]byte, len(discard)*2) - } - reader.Read(discard[:int(hdr.size)]) + reader.Discard(int(hdr.size)) } } return nil } +type LogIterator func(record []byte) + // Always run this method in it's own goroutine. Otherwise, your program // will just hang waiting on channels. func (l *Logger) StreamEntries(afterTs int64, hash uint32, - ch chan []byte, done chan error) { + iter LogIterator) error { - var paths []string + var wg sync.WaitGroup l.RLock() for _, lf := range l.list { if afterTs < lf.endTs { - paths = append(paths, lf.path) + wg.Add(1) + go lf.FillIfEmpty(&wg) } } l.RUnlock() + wg.Wait() - { - cur := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix)) - if _, err := os.Stat(cur); err == nil { - paths = append(paths, cur) + l.RLock() + var caches []*Cache + for _, lf := range l.list { + if afterTs < lf.endTs { + caches = append(caches, lf.Cache()) } } - for _, path := range paths { - if err := streamEntriesInFile(path, afterTs, hash, ch); err != nil { - close(ch) - done <- err - return + l.RUnlock() + + // Add current cache. + caches = append(caches, l.curFile().cache()) + for _, cache := range caches { + if cache == nil { + glog.Error("Cache is nil") + continue + } + if err := streamEntries(cache, afterTs, hash, iter); err != nil { + return err } } - close(ch) - done <- nil + return nil } diff --git a/commit/log_test.go b/commit/log_test.go index ad2e1e527e50c5103f89e18e48c40079a3f48fbf..c0e2ab6bd87e66ab5bea72345a8098ed7aab6e22 100644 --- a/commit/log_test.go +++ b/commit/log_test.go @@ -211,17 +211,14 @@ func TestReadEntries(t *testing.T) { { // Check for hash = 1, ts >= 2. - ch := make(chan []byte, 10) - done := make(chan error) - go l.StreamEntries(ts+2, uint32(1), ch, done) count := 0 - for val := range ch { + err := l.StreamEntries(ts+2, uint32(1), func(entry []byte) { count += 1 - if bytes.Compare(data, val) != 0 { + if bytes.Compare(data, entry) != 0 { t.Error("Data doesn't equate.") } - } - if err := <-done; err != nil { + }) + if err != nil { t.Error(err) } if count != 2 { @@ -233,17 +230,14 @@ func TestReadEntries(t *testing.T) { t.Error(err) } // Check for hash = 1, ts >= 2. - ch := make(chan []byte, 10) - done := make(chan error) - go l.StreamEntries(ts, uint32(1), ch, done) count := 0 - for val := range ch { + err := l.StreamEntries(ts, uint32(1), func(entry []byte) { count += 1 - if bytes.Compare(data, val) != 0 { + if bytes.Compare(data, entry) != 0 { t.Error("Data doesn't equate.") } - } - if err := <-done; err != nil { + }) + if err != nil { t.Error(err) } if count != 4 { diff --git a/gql/parser_test.go b/gql/parser_test.go index ab17f0a225da40a8d66abb7d2f5a6cd63968339c..b9cf369d43c8be68458beddd2bd3198c86a9b3e5 100644 --- a/gql/parser_test.go +++ b/gql/parser_test.go @@ -20,7 +20,6 @@ import ( "fmt" "testing" - "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/query" ) @@ -77,7 +76,7 @@ func TestParse(t *testing.T) { } func TestParseXid(t *testing.T) { - logrus.SetLevel(logrus.DebugLevel) + // logrus.SetLevel(logrus.DebugLevel) query := ` query { user(_uid_: 0x11) { diff --git a/posting/list.go b/posting/list.go index af53ede64117964f5f91aed1224bc79ab9746d07..a1b6a29a74dfbc9058bb4f32e0899d1041340014 100644 --- a/posting/list.go +++ b/posting/list.go @@ -242,12 +242,8 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { l.hash = farm.Fingerprint32(key) l.mlayer = make(map[int]types.Posting) - ch := make(chan []byte, 100) - done := make(chan error) glog.Debug("Starting stream entries...") - go clog.StreamEntries(posting.CommitTs()+1, l.hash, ch, done) - - for buffer := range ch { + err := clog.StreamEntries(posting.CommitTs()+1, l.hash, func(buffer []byte) { uo := flatbuffers.GetUOffsetT(buffer) m := new(types.Posting) m.Init(buffer, uo) @@ -260,8 +256,8 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { "ts": m.Ts(), }).Debug("Got entry from log") l.mergeMutation(m) - } - if err := <-done; err != nil { + }) + if err != nil { glog.WithError(err).Error("While streaming entries.") } glog.Debug("Done streaming entries.") diff --git a/query/query_test.go b/query/query_test.go index e60c308489b6878971d2a9a7f05d202e830bc0ca..549127bb6df81a8d3840baf4032e27b1408bf650 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -23,7 +23,6 @@ import ( "testing" "time" - "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" @@ -190,21 +189,19 @@ func TestNewGraph(t *testing.T) { } } -func populateGraph(t *testing.T) { - logrus.SetLevel(logrus.DebugLevel) +func populateGraph(t *testing.T) string { + // logrus.SetLevel(logrus.DebugLevel) dir, err := ioutil.TempDir("", "storetest_") if err != nil { t.Error(err) - return + return "" } - defer os.RemoveAll(dir) ps := new(store.Store) ps.Init(dir) clog := commit.NewLogger(dir, "mutations", 50<<20) clog.Init() - defer clog.Close() posting.Init(ps, clog) // So, user we're interested in has uid: 1. @@ -250,10 +247,13 @@ func populateGraph(t *testing.T) { edge.Value = "Andrea" addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name"))) + + return dir } func TestProcessGraph(t *testing.T) { - populateGraph(t) + dir := populateGraph(t) + defer os.RemoveAll(dir) // Alright. Now we have everything set up. Let's create the query. sg, err := NewGraph(1, "") @@ -346,7 +346,8 @@ func TestProcessGraph(t *testing.T) { } func TestToJson(t *testing.T) { - populateGraph(t) + dir := populateGraph(t) + defer os.RemoveAll(dir) // Alright. Now we have everything set up. Let's create the query. sg, err := NewGraph(1, "") @@ -379,7 +380,8 @@ func TestToJson(t *testing.T) { t.Error(err) } - js, err := sg.ToJson() + var l Latency + js, err := sg.ToJson(&l) if err != nil { t.Error(err) } diff --git a/server/loader/main.go b/server/loader/main.go index 7cb23237372092693432e52e069a063bb01dd586..2390f484619a24d983841eb1980f141df45d8744 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -72,7 +72,7 @@ func main() { defer ps.Close() clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20) - clog.SkipWrite = true // Don't write to commit logs. + clog.SetSkipWrite(true) // Don't write to commit logs. clog.Init() defer clog.Close() posting.Init(ps, clog) diff --git a/server/main.go b/server/main.go index e502c161ffe5132ef9b387845e7f40514eba3a70..1fc3b65d3c7fc15da0308bd67ef611e4f6e56e10 100644 --- a/server/main.go +++ b/server/main.go @@ -107,7 +107,7 @@ func main() { defer ps.Close() clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20) - clog.SkipWrite = false + clog.SetSkipWrite(false) clog.SyncEvery = 1 clog.Init() defer clog.Close()