Skip to content
Snippets Groups Projects
Commit f93ca7e3 authored by Manish R Jain's avatar Manish R Jain
Browse files

No need for pauseCheckMemory. Also remove unused flags.

parent 59a48c2c
No related branches found
No related tags found
No related merge requests found
......@@ -21,11 +21,11 @@ ENV LD_LIBRARY_PATH "/usr/local/lib"
# Install DGraph and update dependencies to right versions.
RUN go get -v github.com/robfig/glock && \
go get -v github.com/dgraph-io/dgraph/... && \
glock sync github.com/dgraph-io/dgraph
glock sync github.com/dgraph-io/dgraph && echo "v0.1"
# Run some tests, don't build an image if we're failing tests.
RUN go test github.com/dgraph-io/dgraph/...
# Create the data directory. This directory should be mapped
# Create the dgraph and data directory. These directories should be mapped
# to host machine for persistence.
RUN mkdir /data
RUN mkdir /dgraph && mkdir /data
......@@ -31,9 +31,6 @@ import (
"github.com/zond/gotomic"
)
var minmemory = flag.Uint64("min_ram_mb", 2048,
"If RAM usage exceeds this, start periodically evicting posting lists"+
" from memory.")
var maxmemory = flag.Uint64("max_ram_mb", 4096,
"If RAM usage exceeds this, we stop the world, and flush our buffers.")
......@@ -46,32 +43,35 @@ type counters struct {
func (c *counters) periodicLog() {
for _ = range c.ticker.C {
added := atomic.LoadUint64(&c.added)
merged := atomic.LoadUint64(&c.merged)
var pending uint64
if added > merged {
pending = added - merged
}
c.log()
}
}
glog.WithFields(logrus.Fields{
"added": added,
"merged": merged,
"clean": atomic.LoadUint64(&c.clean),
"pending": pending,
"mapsize": lhmap.Size(),
"dirtysize": dirtymap.Size(),
}).Info("List Merge counters")
func (c *counters) log() {
added := atomic.LoadUint64(&c.added)
merged := atomic.LoadUint64(&c.merged)
var pending uint64
if added > merged {
pending = added - merged
}
glog.WithFields(logrus.Fields{
"added": added,
"merged": merged,
"clean": atomic.LoadUint64(&c.clean),
"pending": pending,
"mapsize": lhmap.Size(),
"dirtysize": dirtymap.Size(),
}).Info("List Merge counters")
}
func NewCounters() *counters {
c := new(counters)
c.ticker = time.NewTicker(time.Second)
go c.periodicLog()
return c
}
var MIB, MAX_MEMORY, MIN_MEMORY uint64
var MIB, MAX_MEMORY uint64
func aggressivelyEvict(ms runtime.MemStats) {
// Okay, we exceed the max memory threshold.
......@@ -105,7 +105,7 @@ func gentlyMerge(ms runtime.MemStats) {
var hs []gotomic.Hashable
dirtymap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
hs = append(hs, k)
return len(hs) > 400
return len(hs) >= 400
})
idx := 0
......@@ -133,18 +133,14 @@ func gentlyMerge(ms runtime.MemStats) {
}
mergeAndUpdate(l, ctr)
}
ctr.log()
}
func checkMemoryUsage() {
MIB = 1 << 20
MAX_MEMORY = *maxmemory * MIB
MIN_MEMORY = *minmemory * MIB // Not being used right now.
for _ = range time.Tick(5 * time.Second) {
if atomic.LoadInt32(&pauseCheckMemory) == 1 {
continue
}
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
if ms.Alloc > MAX_MEMORY {
......@@ -156,7 +152,6 @@ func checkMemoryUsage() {
}
}
var pauseCheckMemory int32
var stopTheWorld sync.RWMutex
var lhmap *gotomic.Hash
var dirtymap *gotomic.Hash
......@@ -242,10 +237,13 @@ func queueAll(ch chan gotomic.Hashable, c *counters) {
}
func MergeLists(numRoutines int) {
atomic.StoreInt32(&pauseCheckMemory, 1)
// We're merging all the lists, so just create a new dirtymap.
dirtymap = gotomic.NewHash()
ch := make(chan gotomic.Hashable, 10000)
c := NewCounters()
go c.periodicLog()
defer c.ticker.Stop()
go queueAll(ch, c)
wg := new(sync.WaitGroup)
......@@ -255,5 +253,4 @@ func MergeLists(numRoutines int) {
}
wg.Wait()
c.ticker.Stop()
atomic.StoreInt32(&pauseCheckMemory, 0)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment