Newer
Older
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package posting
import (
Ubuntu
committed
"flag"
Manish R Jain
committed
"runtime"
Manish R Jain
committed
"runtime/debug"
"sync/atomic"
Manish R Jain
committed
"time"
Manish R Jain
committed
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
Manish R Jain
committed
"github.com/zond/gotomic"
Manish R Jain
committed
var minmemory = flag.Uint64("min_ram_mb", 2048,
"If RAM usage exceeds this, start periodically evicting posting lists"+
" from memory.")
var maxmemory = flag.Uint64("max_ram_mb", 4096,
Ubuntu
committed
"If RAM usage exceeds this, we stop the world, and flush our buffers.")
type counters struct {
Manish R Jain
committed
ticker *time.Ticker
added uint64
merged uint64
Manish R Jain
committed
clean uint64
}
func (c *counters) periodicLog() {
Manish R Jain
committed
for _ = range c.ticker.C {
mapSize := lhmap.Size()
added := atomic.LoadUint64(&c.added)
merged := atomic.LoadUint64(&c.merged)
pending := added - merged
glog.WithFields(logrus.Fields{
"added": added,
"merged": merged,
Manish R Jain
committed
"clean": atomic.LoadUint64(&c.clean),
"pending": pending,
Manish R Jain
committed
"mapsize": mapSize,
}).Info("List Merge counters")
}
}
Manish R Jain
committed
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
func NewCounters() *counters {
c := new(counters)
c.ticker = time.NewTicker(time.Second)
go c.periodicLog()
return c
}
var MIB, MAX_MEMORY, MIN_MEMORY uint64
func aggressivelyEvict(ms runtime.MemStats) {
// Okay, we exceed the max memory threshold.
// Stop the world, and deal with this first.
stopTheWorld.Lock()
defer stopTheWorld.Unlock()
megs := ms.Alloc / MIB
glog.WithField("allocated_MB", megs).
Info("Memory usage over threshold. STOPPED THE WORLD!")
glog.Info("Calling merge on all lists.")
MergeLists(100 * runtime.GOMAXPROCS(-1))
glog.Info("Merged lists. Calling GC.")
runtime.GC() // Call GC to do some cleanup.
glog.Info("Trying to free OS memory")
debug.FreeOSMemory()
runtime.ReadMemStats(&ms)
megs = ms.Alloc / MIB
glog.WithField("allocated_MB", megs).
Info("Memory Usage after calling GC.")
}
func gentlyMerge(ms runtime.MemStats) {
ctr := NewCounters()
defer ctr.ticker.Stop()
count := 0
t := time.NewTicker(10 * time.Millisecond)
defer t.Stop()
for _ = range t.C {
count += 1
if count > 400 {
break // We're doing 100 per second. So, stop after 4 seconds.
}
ret, ok := dirtyList.Pop()
if !ok || ret == nil {
break
}
// Not calling processOne, because we don't want to
// remove the postings list from the map, to avoid
// a race condition, where another caller re-creates the
// posting list before a merge happens.
l := ret.(*List)
if l == nil {
continue
}
mergeAndUpdate(l, ctr)
}
}
Manish R Jain
committed
func checkMemoryUsage() {
MIB = 1 << 20
Manish R Jain
committed
MAX_MEMORY = *maxmemory * MIB
MIN_MEMORY = *minmemory * MIB // Not being used right now.
Manish R Jain
committed
for _ = range time.Tick(5 * time.Second) {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
Manish R Jain
committed
if ms.Alloc > MAX_MEMORY {
aggressivelyEvict(ms)
Manish R Jain
committed
Manish R Jain
committed
} else {
gentlyMerge(ms)
}
Manish R Jain
committed
}
}
var stopTheWorld sync.RWMutex
var lhmap *gotomic.Hash
Manish R Jain
committed
var dirtyList *gotomic.List
var pstore *store.Store
func Init(posting *store.Store, log *commit.Logger) {
Manish R Jain
committed
lhmap = gotomic.NewHash()
Manish R Jain
committed
dirtyList = gotomic.NewList()
Manish R Jain
committed
go checkMemoryUsage()
Manish R Jain
committed
}
Manish R Jain
committed
func GetOrCreate(key []byte) *List {
stopTheWorld.RLock()
defer stopTheWorld.RUnlock()
uid := farm.Fingerprint64(key)
Manish R Jain
committed
ukey := gotomic.IntKey(uid)
lp, _ := lhmap.Get(ukey)
if lp != nil {
return lp.(*List)
}
l := NewList()
if inserted := lhmap.PutIfMissing(ukey, l); inserted {
l.init(key, pstore, clog)
Manish R Jain
committed
return l
Manish R Jain
committed
} else {
lp, _ = lhmap.Get(ukey)
return lp.(*List)
Manish R Jain
committed
}
}
Manish R Jain
committed
func mergeAndUpdate(l *List, c *counters) {
if l == nil {
return
}
if merged, err := l.MergeIfDirty(); err != nil {
glog.WithError(err).Error("While commiting dirty list.")
} else if merged {
atomic.AddUint64(&c.merged, 1)
} else {
atomic.AddUint64(&c.clean, 1)
}
}
Manish R Jain
committed
func processOne(k gotomic.Hashable, c *counters) {
ret, _ := lhmap.Delete(k)
Ubuntu
committed
if ret == nil {
return
}
Manish R Jain
committed
l := ret.(*List)
Manish R Jain
committed
Manish R Jain
committed
if l == nil {
return
Manish R Jain
committed
l.SetForDeletion() // No more AddMutation.
Manish R Jain
committed
mergeAndUpdate(l, c)
Manish R Jain
committed
// For on-demand merging of all lists.
func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) {
Manish R Jain
committed
for dirtyList.Size() > 0 {
ret, ok := dirtyList.Pop()
if !ok || ret == nil {
continue
}
l := ret.(*List)
mergeAndUpdate(l, c)
}
for k := range ch {
processOne(k, c)
Manish R Jain
committed
}
Manish R Jain
committed
if wg != nil {
wg.Done()
}
}
Manish R Jain
committed
func queueAll(ch chan gotomic.Hashable, c *counters) {
lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
ch <- k
atomic.AddUint64(&c.added, 1)
return false // If this returns true, Each would break.
})
close(ch)
}
func MergeLists(numRoutines int) {
Manish R Jain
committed
ch := make(chan gotomic.Hashable, 10000)
Manish R Jain
committed
c := NewCounters()
Manish R Jain
committed
go queueAll(ch, c)
wg := new(sync.WaitGroup)
for i := 0; i < numRoutines; i++ {
wg.Add(1)
Manish R Jain
committed
go process(ch, c, wg)
}
wg.Wait()
Manish R Jain
committed
c.ticker.Stop()
}