Skip to content
Snippets Groups Projects
Commit 0e423671 authored by Manish R Jain's avatar Manish R Jain
Browse files

Add bloom filter to log caches

parent 0495f7d8
No related branches found
No related tags found
No related merge requests found
......@@ -17,12 +17,15 @@
package commit
import (
"encoding/binary"
"errors"
"io"
"io/ioutil"
"sync"
"sync/atomic"
"time"
"github.com/willf/bloom"
)
var E_READ = errors.New("Unable to read")
......@@ -32,13 +35,31 @@ type Cache struct {
sync.RWMutex
buf []byte
lastAccess int64
bf *bloom.BloomFilter
}
func toBytes(hash uint32) []byte {
n := make([]byte, 8)
nlen := binary.PutUvarint(n, uint64(hash))
return n[:nlen]
}
func (c *Cache) Present(hash uint32) bool {
c.RLock()
defer c.RUnlock()
if c.bf == nil {
return true
}
return c.bf.Test(toBytes(hash))
}
func (c *Cache) Write(p []byte) (n int, err error) {
func (c *Cache) Write(hash uint32, p []byte) (n int, err error) {
atomic.StoreInt64(&c.lastAccess, time.Now().UnixNano())
c.Lock()
defer c.Unlock()
c.bf.Add(toBytes(hash))
c.buf = append(c.buf, p...)
return len(p), nil
}
......@@ -93,12 +114,8 @@ func FillCache(c *Cache, path string) error {
if err != nil {
return err
}
n, err := c.Write(buf)
if err != nil {
return err
}
if n < len(buf) {
return E_WRITE
}
c.Lock()
c.buf = buf
c.Unlock()
return nil
}
......@@ -40,6 +40,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/x"
"github.com/willf/bloom"
)
var glog = x.Log("commitlog")
......@@ -50,6 +51,7 @@ type logFile struct {
path string
size int64
cache *Cache
bf *bloom.BloomFilter
}
func (lf *logFile) Cache() *Cache {
......@@ -65,9 +67,34 @@ func (lf *logFile) FillIfEmpty(wg *sync.WaitGroup) {
if lf.cache != nil {
return
}
lf.cache = new(Cache)
if err := FillCache(lf.cache, lf.path); err != nil {
glog.WithError(err).WithField("path", lf.path).Fatal("Unable to fill cache.")
cache := new(Cache)
if err := FillCache(cache, lf.path); err != nil {
glog.WithError(err).WithField("path", lf.path).
Fatal("Unable to fill cache.")
}
// No need to acquire lock on cache, because it just
// got created.
createAndUpdateBloomFilter(cache)
lf.cache = cache
}
// Lock must have been acquired.
func createAndUpdateBloomFilter(cache *Cache) {
hashes := make([]uint32, 50000)
hashes = hashes[:0]
if err := streamEntries(cache, 0, 0, func(hdr Header, record []byte) {
hashes = append(hashes, hdr.hash)
}); err != nil {
glog.WithError(err).Fatal("Unable to create bloom filters.")
}
n := 100000
if len(hashes) > n {
n = len(hashes)
}
cache.bf = bloom.NewWithEstimates(uint(n), 0.0001)
for _, hash := range hashes {
cache.bf.Add(toBytes(hash))
}
}
......@@ -267,6 +294,7 @@ func (l *Logger) Init() {
if ferr := FillCache(cache, path); ferr != nil {
glog.WithError(ferr).Fatal("Unable to write to cache.")
}
createAndUpdateBloomFilter(cache)
atomic.StorePointer(&l.cf.cch, unsafe.Pointer(cache))
lastTs, err := lastTimestamp(cache)
if err != nil {
......@@ -388,6 +416,7 @@ func (l *Logger) rotateCurrent() error {
lf.path = newpath
lf.size = cf.size
lf.cache = cf.cache()
createAndUpdateBloomFilter(lf.cache)
l.list = append(l.list, lf)
l.createNew()
......@@ -408,6 +437,7 @@ func (l *Logger) createNew() {
l.cf = new(CurFile)
l.cf.f = f
cache := new(Cache)
createAndUpdateBloomFilter(cache)
atomic.StorePointer(&l.cf.cch, unsafe.Pointer(cache))
}
......@@ -454,7 +484,7 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
glog.WithError(err).Error("While writing to current file.")
return err
}
if _, err = cf.cache().Write(buf.Bytes()); err != nil {
if _, err = cf.cache().Write(hash, buf.Bytes()); err != nil {
glog.WithError(err).Error("While writing to current cache.")
return err
}
......@@ -469,6 +499,8 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
return nil
}
// streamEntries allows for hash to be zero.
// This means iterate over all the entries.
func streamEntries(cache *Cache,
afterTs int64, hash uint32, iter LogIterator) error {
......@@ -492,7 +524,7 @@ func streamEntries(cache *Cache,
return err
}
if hdr.hash == hash && hdr.ts >= afterTs {
if (hash == 0 || hdr.hash == hash) && hdr.ts >= afterTs {
// Iterator expects a copy of the buffer, so create one, instead of
// creating a big buffer upfront and reusing it.
data := make([]byte, hdr.size)
......@@ -533,14 +565,16 @@ func (l *Logger) StreamEntries(afterTs int64, hash uint32,
l.RLock()
var caches []*Cache
for _, lf := range l.list {
if afterTs < lf.endTs {
if afterTs < lf.endTs && lf.cache.Present(hash) {
caches = append(caches, lf.Cache())
}
}
l.RUnlock()
// Add current cache.
caches = append(caches, l.curFile().cache())
if l.curFile().cache().Present(hash) {
caches = append(caches, l.curFile().cache())
}
for _, cache := range caches {
if cache == nil {
glog.Error("Cache is nil")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment