Newer
Older
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package posting
import (
Ubuntu
committed
"flag"
Manish R Jain
committed
"runtime"
Manish R Jain
committed
"runtime/debug"
"sync/atomic"
Manish R Jain
committed
"time"
Manish R Jain
committed
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
Manish R Jain
committed
"github.com/zond/gotomic"
Manish R Jain
committed
var minmemory = flag.Uint64("min_ram_mb", 2048,
"If RAM usage exceeds this, start periodically evicting posting lists"+
" from memory.")
var maxmemory = flag.Uint64("max_ram_mb", 4096,
Ubuntu
committed
"If RAM usage exceeds this, we stop the world, and flush our buffers.")
type counters struct {
Manish R Jain
committed
ticker *time.Ticker
added uint64
merged uint64
Manish R Jain
committed
clean uint64
}
func (c *counters) periodicLog() {
Manish R Jain
committed
for _ = range c.ticker.C {
added := atomic.LoadUint64(&c.added)
merged := atomic.LoadUint64(&c.merged)
Manish R Jain
committed
var pending uint64
if added > merged {
pending = added - merged
}
glog.WithFields(logrus.Fields{
Manish R Jain
committed
"added": added,
"merged": merged,
"clean": atomic.LoadUint64(&c.clean),
"pending": pending,
"mapsize": lhmap.Size(),
"dirtysize": dirtymap.Size(),
Manish R Jain
committed
}).Info("List Merge counters")
}
}
Manish R Jain
committed
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
func NewCounters() *counters {
c := new(counters)
c.ticker = time.NewTicker(time.Second)
go c.periodicLog()
return c
}
var MIB, MAX_MEMORY, MIN_MEMORY uint64
func aggressivelyEvict(ms runtime.MemStats) {
// Okay, we exceed the max memory threshold.
// Stop the world, and deal with this first.
stopTheWorld.Lock()
defer stopTheWorld.Unlock()
megs := ms.Alloc / MIB
glog.WithField("allocated_MB", megs).
Info("Memory usage over threshold. STOPPED THE WORLD!")
glog.Info("Calling merge on all lists.")
MergeLists(100 * runtime.GOMAXPROCS(-1))
glog.Info("Merged lists. Calling GC.")
runtime.GC() // Call GC to do some cleanup.
glog.Info("Trying to free OS memory")
debug.FreeOSMemory()
runtime.ReadMemStats(&ms)
megs = ms.Alloc / MIB
glog.WithField("allocated_MB", megs).
Info("Memory Usage after calling GC.")
}
func gentlyMerge(ms runtime.MemStats) {
ctr := NewCounters()
defer ctr.ticker.Stop()
Manish R Jain
committed
// Pick 400 keys from dirty map.
var hs []gotomic.Hashable
dirtymap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
hs = append(hs, k)
return len(hs) > 400
})
idx := 0
Manish R Jain
committed
t := time.NewTicker(10 * time.Millisecond)
defer t.Stop()
for _ = range t.C {
Manish R Jain
committed
if idx >= len(hs) {
break
Manish R Jain
committed
}
Manish R Jain
committed
hid := hs[idx]
idx += 1
dirtymap.Delete(hid)
ret, ok := lhmap.Get(hid)
Manish R Jain
committed
if !ok || ret == nil {
Manish R Jain
committed
continue
Manish R Jain
committed
}
// Not calling processOne, because we don't want to
// remove the postings list from the map, to avoid
// a race condition, where another caller re-creates the
// posting list before a merge happens.
l := ret.(*List)
if l == nil {
continue
}
mergeAndUpdate(l, ctr)
}
}
Manish R Jain
committed
func checkMemoryUsage() {
MIB = 1 << 20
Manish R Jain
committed
MAX_MEMORY = *maxmemory * MIB
MIN_MEMORY = *minmemory * MIB // Not being used right now.
Manish R Jain
committed
for _ = range time.Tick(5 * time.Second) {
Manish R Jain
committed
if atomic.LoadInt32(&pauseCheckMemory) == 1 {
continue
}
Manish R Jain
committed
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
Manish R Jain
committed
if ms.Alloc > MAX_MEMORY {
aggressivelyEvict(ms)
Manish R Jain
committed
Manish R Jain
committed
} else {
gentlyMerge(ms)
}
Manish R Jain
committed
}
}
Manish R Jain
committed
var pauseCheckMemory int32
Manish R Jain
committed
var stopTheWorld sync.RWMutex
var lhmap *gotomic.Hash
Manish R Jain
committed
var dirtymap *gotomic.Hash
var pstore *store.Store
func Init(posting *store.Store, log *commit.Logger) {
Manish R Jain
committed
lhmap = gotomic.NewHash()
Manish R Jain
committed
dirtymap = gotomic.NewHash()
Manish R Jain
committed
go checkMemoryUsage()
Manish R Jain
committed
}
Manish R Jain
committed
func GetOrCreate(key []byte) *List {
stopTheWorld.RLock()
defer stopTheWorld.RUnlock()
uid := farm.Fingerprint64(key)
Manish R Jain
committed
ukey := gotomic.IntKey(uid)
lp, _ := lhmap.Get(ukey)
if lp != nil {
return lp.(*List)
}
l := NewList()
if inserted := lhmap.PutIfMissing(ukey, l); inserted {
l.init(key, pstore, clog)
Manish R Jain
committed
return l
Manish R Jain
committed
} else {
lp, _ = lhmap.Get(ukey)
return lp.(*List)
Manish R Jain
committed
}
}
Manish R Jain
committed
func mergeAndUpdate(l *List, c *counters) {
if l == nil {
return
}
if merged, err := l.MergeIfDirty(); err != nil {
glog.WithError(err).Error("While commiting dirty list.")
} else if merged {
atomic.AddUint64(&c.merged, 1)
} else {
atomic.AddUint64(&c.clean, 1)
}
}
Manish R Jain
committed
func processOne(k gotomic.Hashable, c *counters) {
ret, _ := lhmap.Delete(k)
Ubuntu
committed
if ret == nil {
return
}
Manish R Jain
committed
l := ret.(*List)
Manish R Jain
committed
Manish R Jain
committed
if l == nil {
return
Manish R Jain
committed
l.SetForDeletion() // No more AddMutation.
Manish R Jain
committed
mergeAndUpdate(l, c)
Manish R Jain
committed
// For on-demand merging of all lists.
func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) {
Manish R Jain
committed
// No need to go through dirtymap, because we're going through
// everything right now anyways.
Manish R Jain
committed
for k := range ch {
processOne(k, c)
Manish R Jain
committed
}
Manish R Jain
committed
if wg != nil {
wg.Done()
}
}
Manish R Jain
committed
func queueAll(ch chan gotomic.Hashable, c *counters) {
lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
ch <- k
atomic.AddUint64(&c.added, 1)
return false // If this returns true, Each would break.
})
close(ch)
}
func MergeLists(numRoutines int) {
Manish R Jain
committed
atomic.StoreInt32(&pauseCheckMemory, 1)
Manish R Jain
committed
ch := make(chan gotomic.Hashable, 10000)
Manish R Jain
committed
c := NewCounters()
Manish R Jain
committed
go queueAll(ch, c)
wg := new(sync.WaitGroup)
for i := 0; i < numRoutines; i++ {
wg.Add(1)
Manish R Jain
committed
go process(ch, c, wg)
}
wg.Wait()
Manish R Jain
committed
c.ticker.Stop()
Manish R Jain
committed
atomic.StoreInt32(&pauseCheckMemory, 0)