diff --git a/Dockerfile b/Dockerfile index 0b8925e1233a3c6ca0dd883f3f2a60aeba8a1608..a781ae2d66581336c7df41256cff18a5e0d1379b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,11 +21,11 @@ ENV LD_LIBRARY_PATH "/usr/local/lib" # Install DGraph and update dependencies to right versions. RUN go get -v github.com/robfig/glock && \ go get -v github.com/dgraph-io/dgraph/... && \ - glock sync github.com/dgraph-io/dgraph + glock sync github.com/dgraph-io/dgraph && echo "v0.1" # Run some tests, don't build an image if we're failing tests. RUN go test github.com/dgraph-io/dgraph/... -# Create the data directory. This directory should be mapped +# Create the dgraph and data directory. These directories should be mapped # to host machine for persistence. -RUN mkdir /data +RUN mkdir /dgraph && mkdir /data diff --git a/posting/lists.go b/posting/lists.go index 5d88c29a0ddc2134e840e3cb04906ec4b35b1f79..948c3b731e8d092e1111ca816755e2d9cc0a4fc4 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -31,9 +31,6 @@ import ( "github.com/zond/gotomic" ) -var minmemory = flag.Uint64("min_ram_mb", 2048, - "If RAM usage exceeds this, start periodically evicting posting lists"+ - " from memory.") var maxmemory = flag.Uint64("max_ram_mb", 4096, "If RAM usage exceeds this, we stop the world, and flush our buffers.") @@ -46,32 +43,35 @@ type counters struct { func (c *counters) periodicLog() { for _ = range c.ticker.C { - added := atomic.LoadUint64(&c.added) - merged := atomic.LoadUint64(&c.merged) - var pending uint64 - if added > merged { - pending = added - merged - } + c.log() + } +} - glog.WithFields(logrus.Fields{ - "added": added, - "merged": merged, - "clean": atomic.LoadUint64(&c.clean), - "pending": pending, - "mapsize": lhmap.Size(), - "dirtysize": dirtymap.Size(), - }).Info("List Merge counters") +func (c *counters) log() { + added := atomic.LoadUint64(&c.added) + merged := atomic.LoadUint64(&c.merged) + var pending uint64 + if added > merged { + pending = added - merged } + + glog.WithFields(logrus.Fields{ + "added": added, + "merged": merged, + "clean": atomic.LoadUint64(&c.clean), + "pending": pending, + "mapsize": lhmap.Size(), + "dirtysize": dirtymap.Size(), + }).Info("List Merge counters") } func NewCounters() *counters { c := new(counters) c.ticker = time.NewTicker(time.Second) - go c.periodicLog() return c } -var MIB, MAX_MEMORY, MIN_MEMORY uint64 +var MIB, MAX_MEMORY uint64 func aggressivelyEvict(ms runtime.MemStats) { // Okay, we exceed the max memory threshold. @@ -105,7 +105,7 @@ func gentlyMerge(ms runtime.MemStats) { var hs []gotomic.Hashable dirtymap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool { hs = append(hs, k) - return len(hs) > 400 + return len(hs) >= 400 }) idx := 0 @@ -133,18 +133,14 @@ func gentlyMerge(ms runtime.MemStats) { } mergeAndUpdate(l, ctr) } + ctr.log() } func checkMemoryUsage() { MIB = 1 << 20 MAX_MEMORY = *maxmemory * MIB - MIN_MEMORY = *minmemory * MIB // Not being used right now. for _ = range time.Tick(5 * time.Second) { - if atomic.LoadInt32(&pauseCheckMemory) == 1 { - continue - } - var ms runtime.MemStats runtime.ReadMemStats(&ms) if ms.Alloc > MAX_MEMORY { @@ -156,7 +152,6 @@ func checkMemoryUsage() { } } -var pauseCheckMemory int32 var stopTheWorld sync.RWMutex var lhmap *gotomic.Hash var dirtymap *gotomic.Hash @@ -242,10 +237,13 @@ func queueAll(ch chan gotomic.Hashable, c *counters) { } func MergeLists(numRoutines int) { - atomic.StoreInt32(&pauseCheckMemory, 1) + // We're merging all the lists, so just create a new dirtymap. + dirtymap = gotomic.NewHash() ch := make(chan gotomic.Hashable, 10000) c := NewCounters() + go c.periodicLog() + defer c.ticker.Stop() go queueAll(ch, c) wg := new(sync.WaitGroup) @@ -255,5 +253,4 @@ func MergeLists(numRoutines int) { } wg.Wait() c.ticker.Stop() - atomic.StoreInt32(&pauseCheckMemory, 0) }