Newer
Older
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package posting
import (
Ubuntu
committed
"flag"
Manish R Jain
committed
"runtime"
Manish R Jain
committed
"runtime/debug"
"sync/atomic"
Manish R Jain
committed
"time"
Manish R Jain
committed
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
Manish R Jain
committed
"github.com/zond/gotomic"
Manish R Jain
committed
var maxmemory = flag.Uint64("max_ram_mb", 4096,
Ubuntu
committed
"If RAM usage exceeds this, we stop the world, and flush our buffers.")
type counters struct {
Manish R Jain
committed
ticker *time.Ticker
added uint64
merged uint64
Manish R Jain
committed
clean uint64
}
func (c *counters) periodicLog() {
Manish R Jain
committed
for _ = range c.ticker.C {
c.log()
}
}
func (c *counters) log() {
added := atomic.LoadUint64(&c.added)
merged := atomic.LoadUint64(&c.merged)
var pending uint64
if added > merged {
pending = added - merged
glog.WithFields(logrus.Fields{
"added": added,
"merged": merged,
"clean": atomic.LoadUint64(&c.clean),
"pending": pending,
"mapsize": lhmap.Size(),
"dirtysize": dirtymap.Size(),
}).Info("List Merge counters")
Manish R Jain
committed
func NewCounters() *counters {
c := new(counters)
c.ticker = time.NewTicker(time.Second)
return c
}
var MIB, MAX_MEMORY uint64
Manish R Jain
committed
func aggressivelyEvict(ms runtime.MemStats) {
// Okay, we exceed the max memory threshold.
// Stop the world, and deal with this first.
stopTheWorld.Lock()
defer stopTheWorld.Unlock()
megs := ms.Alloc / MIB
glog.WithField("allocated_MB", megs).
Info("Memory usage over threshold. STOPPED THE WORLD!")
glog.Info("Calling merge on all lists.")
MergeLists(100 * runtime.GOMAXPROCS(-1))
glog.Info("Merged lists. Calling GC.")
runtime.GC() // Call GC to do some cleanup.
glog.Info("Trying to free OS memory")
debug.FreeOSMemory()
runtime.ReadMemStats(&ms)
megs = ms.Alloc / MIB
glog.WithField("allocated_MB", megs).
Info("Memory Usage after calling GC.")
}
func gentlyMerge(ms runtime.MemStats) {
ctr := NewCounters()
defer ctr.ticker.Stop()
Manish R Jain
committed
// Pick 400 keys from dirty map.
var hs []gotomic.Hashable
dirtymap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
hs = append(hs, k)
return len(hs) >= 400
Manish R Jain
committed
})
idx := 0
Manish R Jain
committed
t := time.NewTicker(10 * time.Millisecond)
defer t.Stop()
for _ = range t.C {
Manish R Jain
committed
if idx >= len(hs) {
break
Manish R Jain
committed
}
Manish R Jain
committed
hid := hs[idx]
idx += 1
dirtymap.Delete(hid)
ret, ok := lhmap.Get(hid)
Manish R Jain
committed
if !ok || ret == nil {
Manish R Jain
committed
continue
Manish R Jain
committed
}
// Not calling processOne, because we don't want to
// remove the postings list from the map, to avoid
// a race condition, where another caller re-creates the
// posting list before a merge happens.
l := ret.(*List)
if l == nil {
continue
}
mergeAndUpdate(l, ctr)
}
Manish R Jain
committed
}
Manish R Jain
committed
func checkMemoryUsage() {
MIB = 1 << 20
Manish R Jain
committed
MAX_MEMORY = *maxmemory * MIB
Manish R Jain
committed
for _ = range time.Tick(5 * time.Second) {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
Manish R Jain
committed
if ms.Alloc > MAX_MEMORY {
aggressivelyEvict(ms)
Manish R Jain
committed
Manish R Jain
committed
} else {
gentlyMerge(ms)
}
Manish R Jain
committed
}
}
var stopTheWorld sync.RWMutex
var lhmap *gotomic.Hash
Manish R Jain
committed
var dirtymap *gotomic.Hash
var pstore *store.Store
func Init(posting *store.Store, log *commit.Logger) {
Manish R Jain
committed
lhmap = gotomic.NewHash()
Manish R Jain
committed
dirtymap = gotomic.NewHash()
Manish R Jain
committed
go checkMemoryUsage()
Manish R Jain
committed
}
Manish R Jain
committed
func GetOrCreate(key []byte) *List {
stopTheWorld.RLock()
defer stopTheWorld.RUnlock()
uid := farm.Fingerprint64(key)
Manish R Jain
committed
ukey := gotomic.IntKey(uid)
lp, _ := lhmap.Get(ukey)
if lp != nil {
return lp.(*List)
}
l := NewList()
if inserted := lhmap.PutIfMissing(ukey, l); inserted {
l.init(key, pstore, clog)
Manish R Jain
committed
return l
Manish R Jain
committed
} else {
lp, _ = lhmap.Get(ukey)
return lp.(*List)
Manish R Jain
committed
}
}
Manish R Jain
committed
func mergeAndUpdate(l *List, c *counters) {
if l == nil {
return
}
if merged, err := l.MergeIfDirty(); err != nil {
glog.WithError(err).Error("While commiting dirty list.")
} else if merged {
atomic.AddUint64(&c.merged, 1)
} else {
atomic.AddUint64(&c.clean, 1)
}
}
Manish R Jain
committed
func processOne(k gotomic.Hashable, c *counters) {
ret, _ := lhmap.Delete(k)
Ubuntu
committed
if ret == nil {
return
}
Manish R Jain
committed
l := ret.(*List)
Manish R Jain
committed
Manish R Jain
committed
if l == nil {
return
Manish R Jain
committed
l.SetForDeletion() // No more AddMutation.
Manish R Jain
committed
mergeAndUpdate(l, c)
Manish R Jain
committed
// For on-demand merging of all lists.
func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) {
Manish R Jain
committed
// No need to go through dirtymap, because we're going through
// everything right now anyways.
Manish R Jain
committed
for k := range ch {
processOne(k, c)
Manish R Jain
committed
}
Manish R Jain
committed
if wg != nil {
wg.Done()
}
}
Manish R Jain
committed
func queueAll(ch chan gotomic.Hashable, c *counters) {
lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
ch <- k
atomic.AddUint64(&c.added, 1)
return false // If this returns true, Each would break.
})
close(ch)
}
func MergeLists(numRoutines int) {
// We're merging all the lists, so just create a new dirtymap.
dirtymap = gotomic.NewHash()
Manish R Jain
committed
Manish R Jain
committed
ch := make(chan gotomic.Hashable, 10000)
Manish R Jain
committed
c := NewCounters()
go c.periodicLog()
defer c.ticker.Stop()
Manish R Jain
committed
go queueAll(ch, c)
wg := new(sync.WaitGroup)
for i := 0; i < numRoutines; i++ {
wg.Add(1)
Manish R Jain
committed
go process(ch, c, wg)
}
wg.Wait()
Manish R Jain
committed
c.ticker.Stop()
}