Newer
Older
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package posting
import (
Ubuntu
committed
"flag"
Manish R Jain
committed
"math/rand"
Manish R Jain
committed
"runtime"
Manish R Jain
committed
"runtime/debug"
"sync/atomic"
Manish R Jain
committed
"time"
Manish R Jain
committed
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
Manish R Jain
committed
"github.com/zond/gotomic"
Manish R Jain
committed
var maxmemory = flag.Uint64("stw_ram_mb", 4096,
Ubuntu
committed
"If RAM usage exceeds this, we stop the world, and flush our buffers.")
type mergeRoutines struct {
sync.RWMutex
count int
}
func (mr *mergeRoutines) Count() int {
mr.RLock()
defer mr.RUnlock()
return mr.count
}
func (mr *mergeRoutines) Add(delta int) {
mr.Lock()
mr.count += delta
mr.Unlock()
}
type counters struct {
ticker *time.Ticker
added uint64
merged uint64
clean uint64
lastVal uint64
}
func (c *counters) periodicLog() {
Manish R Jain
committed
for _ = range c.ticker.C {
c.log()
}
}
func (c *counters) log() {
added := atomic.LoadUint64(&c.added)
merged := atomic.LoadUint64(&c.merged)
lastVal := atomic.LoadUint64(&c.lastVal)
if merged == lastVal {
// Ignore.
return
}
atomic.StoreUint64(&c.lastVal, merged)
var pending uint64
if added > merged {
pending = added - merged
glog.WithFields(logrus.Fields{
"added": added,
"merged": merged,
"clean": atomic.LoadUint64(&c.clean),
"pending": pending,
"mapsize": lhmap.Size(),
"dirtysize": dirtymap.Size(),
}).Info("List Merge counters")
Manish R Jain
committed
func NewCounters() *counters {
c := new(counters)
c.ticker = time.NewTicker(time.Second)
return c
}
var MIB, MAX_MEMORY uint64
Manish R Jain
committed
func aggressivelyEvict(ms runtime.MemStats) {
// Okay, we exceed the max memory threshold.
// Stop the world, and deal with this first.
stopTheWorld.Lock()
defer stopTheWorld.Unlock()
megs := ms.Alloc / MIB
glog.WithField("allocated_MB", megs).
Info("Memory usage over threshold. STOPPED THE WORLD!")
glog.Info("Calling merge on all lists.")
MergeLists(100 * runtime.GOMAXPROCS(-1))
glog.Info("Merged lists. Calling GC.")
runtime.GC() // Call GC to do some cleanup.
glog.Info("Trying to free OS memory")
debug.FreeOSMemory()
runtime.ReadMemStats(&ms)
megs = ms.Alloc / MIB
glog.WithField("allocated_MB", megs).
Info("Memory Usage after calling GC.")
}
func gentlyMerge(mr *mergeRoutines) {
defer mr.Add(-1)
Manish R Jain
committed
ctr := NewCounters()
defer ctr.ticker.Stop()
Manish R Jain
committed
// Pick 5% of the dirty map or 400 keys, whichever is higher.
pick := int(float64(dirtymap.Size()) * 0.05)
Manish R Jain
committed
if pick < 400 {
pick = 400
}
Manish R Jain
committed
// We should start picking up elements from a randomly selected index,
// otherwise, the same keys would keep on getting merged, while the
// rest would never get a chance.
var start int
n := dirtymap.Size() - pick
if n <= 0 {
start = 0
} else {
start = rand.Intn(n)
}
Manish R Jain
committed
var hs []gotomic.Hashable
Manish R Jain
committed
idx := 0
Manish R Jain
committed
dirtymap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
Manish R Jain
committed
if idx < start {
idx += 1
return false
}
Manish R Jain
committed
hs = append(hs, k)
Manish R Jain
committed
return len(hs) >= pick
Manish R Jain
committed
})
Manish R Jain
committed
for _, hid := range hs {
Manish R Jain
committed
dirtymap.Delete(hid)
ret, ok := lhmap.Get(hid)
Manish R Jain
committed
if !ok || ret == nil {
Manish R Jain
committed
continue
Manish R Jain
committed
}
// Not calling processOne, because we don't want to
// remove the postings list from the map, to avoid
// a race condition, where another caller re-creates the
// posting list before a merge happens.
l := ret.(*List)
if l == nil {
continue
}
mergeAndUpdate(l, ctr)
}
Manish R Jain
committed
}
Manish R Jain
committed
func checkMemoryUsage() {
MIB = 1 << 20
Manish R Jain
committed
MAX_MEMORY = *maxmemory * MIB
Manish R Jain
committed
var mr mergeRoutines
Manish R Jain
committed
for _ = range time.Tick(5 * time.Second) {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
Manish R Jain
committed
if ms.Alloc > MAX_MEMORY {
aggressivelyEvict(ms)
Manish R Jain
committed
Manish R Jain
committed
} else {
// If merging is slow, we don't want to end up having too many goroutines
// merging the dirty list. This should keep them in check.
Manish R Jain
committed
// With a value of 18 and duration of 5 seconds, some goroutines are
// taking over 1.5 mins to finish.
if mr.Count() > 18 {
glog.Info("Skipping gentle merging.")
continue
}
mr.Add(1)
Manish R Jain
committed
// gentlyMerge can take a while to finish. So, run it in a goroutine.
go gentlyMerge(&mr)
Manish R Jain
committed
}
Manish R Jain
committed
}
}
var stopTheWorld sync.RWMutex
var lhmap *gotomic.Hash
Manish R Jain
committed
var dirtymap *gotomic.Hash
Manish R Jain
committed
lhmap = gotomic.NewHash()
Manish R Jain
committed
dirtymap = gotomic.NewHash()
Manish R Jain
committed
go checkMemoryUsage()
Manish R Jain
committed
}
func GetOrCreate(key []byte, pstore *store.Store) *List {
Manish R Jain
committed
stopTheWorld.RLock()
defer stopTheWorld.RUnlock()
uid := farm.Fingerprint64(key)
Manish R Jain
committed
ukey := gotomic.IntKey(uid)
lp, _ := lhmap.Get(ukey)
if lp != nil {
return lp.(*List)
}
l := NewList()
if inserted := lhmap.PutIfMissing(ukey, l); inserted {
l.init(key, pstore, clog)
Manish R Jain
committed
return l
Manish R Jain
committed
} else {
lp, _ = lhmap.Get(ukey)
return lp.(*List)
Manish R Jain
committed
}
}
Manish R Jain
committed
func mergeAndUpdate(l *List, c *counters) {
if l == nil {
return
}
if merged, err := l.MergeIfDirty(); err != nil {
glog.WithError(err).Error("While commiting dirty list.")
} else if merged {
atomic.AddUint64(&c.merged, 1)
} else {
atomic.AddUint64(&c.clean, 1)
}
}
Manish R Jain
committed
func processOne(k gotomic.Hashable, c *counters) {
ret, _ := lhmap.Delete(k)
Ubuntu
committed
if ret == nil {
return
}
Manish R Jain
committed
l := ret.(*List)
Manish R Jain
committed
Manish R Jain
committed
if l == nil {
return
Manish R Jain
committed
l.SetForDeletion() // No more AddMutation.
Manish R Jain
committed
mergeAndUpdate(l, c)
Manish R Jain
committed
// For on-demand merging of all lists.
func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) {
Manish R Jain
committed
// No need to go through dirtymap, because we're going through
// everything right now anyways.
Manish R Jain
committed
for k := range ch {
processOne(k, c)
Manish R Jain
committed
}
Manish R Jain
committed
if wg != nil {
wg.Done()
}
}
Manish R Jain
committed
func queueAll(ch chan gotomic.Hashable, c *counters) {
lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
ch <- k
atomic.AddUint64(&c.added, 1)
return false // If this returns true, Each would break.
})
close(ch)
}
func MergeLists(numRoutines int) {
// We're merging all the lists, so just create a new dirtymap.
dirtymap = gotomic.NewHash()
Manish R Jain
committed
Manish R Jain
committed
ch := make(chan gotomic.Hashable, 10000)
Manish R Jain
committed
c := NewCounters()
go c.periodicLog()
defer c.ticker.Stop()
Manish R Jain
committed
go queueAll(ch, c)
wg := new(sync.WaitGroup)
for i := 0; i < numRoutines; i++ {
wg.Add(1)
Manish R Jain
committed
go process(ch, c, wg)
}
wg.Wait()
Manish R Jain
committed
c.ticker.Stop()
}