diff --git a/commit/cache.go b/commit/cache.go index 6fdbe7469aa36dea19b604bfd6b0832012c8a2bb..f5a31b8ac2da1ede43893ccf6e01f5504997fa43 100644 --- a/commit/cache.go +++ b/commit/cache.go @@ -17,12 +17,15 @@ package commit import ( + "encoding/binary" "errors" "io" "io/ioutil" "sync" "sync/atomic" "time" + + "github.com/willf/bloom" ) var E_READ = errors.New("Unable to read") @@ -32,13 +35,31 @@ type Cache struct { sync.RWMutex buf []byte lastAccess int64 + bf *bloom.BloomFilter +} + +func toBytes(hash uint32) []byte { + n := make([]byte, 8) + nlen := binary.PutUvarint(n, uint64(hash)) + return n[:nlen] +} + +func (c *Cache) Present(hash uint32) bool { + c.RLock() + defer c.RUnlock() + + if c.bf == nil { + return true + } + return c.bf.Test(toBytes(hash)) } -func (c *Cache) Write(p []byte) (n int, err error) { +func (c *Cache) Write(hash uint32, p []byte) (n int, err error) { atomic.StoreInt64(&c.lastAccess, time.Now().UnixNano()) c.Lock() defer c.Unlock() + c.bf.Add(toBytes(hash)) c.buf = append(c.buf, p...) return len(p), nil } @@ -93,12 +114,8 @@ func FillCache(c *Cache, path string) error { if err != nil { return err } - n, err := c.Write(buf) - if err != nil { - return err - } - if n < len(buf) { - return E_WRITE - } + c.Lock() + c.buf = buf + c.Unlock() return nil } diff --git a/commit/log.go b/commit/log.go index 9a81304da61dcc85ea74ee3c96fc93dfd0148e43..4d81941f11c4a4567e271fd5de46e99d5bb821df 100644 --- a/commit/log.go +++ b/commit/log.go @@ -40,6 +40,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/x" + "github.com/willf/bloom" ) var glog = x.Log("commitlog") @@ -50,6 +51,7 @@ type logFile struct { path string size int64 cache *Cache + bf *bloom.BloomFilter } func (lf *logFile) Cache() *Cache { @@ -65,9 +67,34 @@ func (lf *logFile) FillIfEmpty(wg *sync.WaitGroup) { if lf.cache != nil { return } - lf.cache = new(Cache) - if err := FillCache(lf.cache, lf.path); err != nil { - glog.WithError(err).WithField("path", lf.path).Fatal("Unable to fill cache.") + cache := new(Cache) + if err := FillCache(cache, lf.path); err != nil { + glog.WithError(err).WithField("path", lf.path). + Fatal("Unable to fill cache.") + } + // No need to acquire lock on cache, because it just + // got created. + createAndUpdateBloomFilter(cache) + lf.cache = cache +} + +// Lock must have been acquired. +func createAndUpdateBloomFilter(cache *Cache) { + hashes := make([]uint32, 50000) + hashes = hashes[:0] + if err := streamEntries(cache, 0, 0, func(hdr Header, record []byte) { + hashes = append(hashes, hdr.hash) + }); err != nil { + glog.WithError(err).Fatal("Unable to create bloom filters.") + } + + n := 100000 + if len(hashes) > n { + n = len(hashes) + } + cache.bf = bloom.NewWithEstimates(uint(n), 0.0001) + for _, hash := range hashes { + cache.bf.Add(toBytes(hash)) } } @@ -267,6 +294,7 @@ func (l *Logger) Init() { if ferr := FillCache(cache, path); ferr != nil { glog.WithError(ferr).Fatal("Unable to write to cache.") } + createAndUpdateBloomFilter(cache) atomic.StorePointer(&l.cf.cch, unsafe.Pointer(cache)) lastTs, err := lastTimestamp(cache) if err != nil { @@ -388,6 +416,7 @@ func (l *Logger) rotateCurrent() error { lf.path = newpath lf.size = cf.size lf.cache = cf.cache() + createAndUpdateBloomFilter(lf.cache) l.list = append(l.list, lf) l.createNew() @@ -408,6 +437,7 @@ func (l *Logger) createNew() { l.cf = new(CurFile) l.cf.f = f cache := new(Cache) + createAndUpdateBloomFilter(cache) atomic.StorePointer(&l.cf.cch, unsafe.Pointer(cache)) } @@ -454,7 +484,7 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { glog.WithError(err).Error("While writing to current file.") return err } - if _, err = cf.cache().Write(buf.Bytes()); err != nil { + if _, err = cf.cache().Write(hash, buf.Bytes()); err != nil { glog.WithError(err).Error("While writing to current cache.") return err } @@ -469,6 +499,8 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { return nil } +// streamEntries allows for hash to be zero. +// This means iterate over all the entries. func streamEntries(cache *Cache, afterTs int64, hash uint32, iter LogIterator) error { @@ -492,7 +524,7 @@ func streamEntries(cache *Cache, return err } - if hdr.hash == hash && hdr.ts >= afterTs { + if (hash == 0 || hdr.hash == hash) && hdr.ts >= afterTs { // Iterator expects a copy of the buffer, so create one, instead of // creating a big buffer upfront and reusing it. data := make([]byte, hdr.size) @@ -533,14 +565,16 @@ func (l *Logger) StreamEntries(afterTs int64, hash uint32, l.RLock() var caches []*Cache for _, lf := range l.list { - if afterTs < lf.endTs { + if afterTs < lf.endTs && lf.cache.Present(hash) { caches = append(caches, lf.Cache()) } } l.RUnlock() // Add current cache. - caches = append(caches, l.curFile().cache()) + if l.curFile().cache().Present(hash) { + caches = append(caches, l.curFile().cache()) + } for _, cache := range caches { if cache == nil { glog.Error("Cache is nil")