Skip to content
Snippets Groups Projects
lists.go 5.45 KiB
Newer Older
/*
 * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * 		http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package posting

import (
	"github.com/dgraph-io/dgraph/commit"
	"github.com/dgraph-io/dgraph/store"
	"github.com/dgryski/go-farm"
var maxmemory = flag.Uint64("max_ram_mb", 4096,
	"If RAM usage exceeds this, we stop the world, and flush our buffers.")

}

func (c *counters) periodicLog() {
func (c *counters) log() {
	added := atomic.LoadUint64(&c.added)
	merged := atomic.LoadUint64(&c.merged)
	var pending uint64
	if added > merged {
		pending = added - merged

	glog.WithFields(logrus.Fields{
		"added":     added,
		"merged":    merged,
		"clean":     atomic.LoadUint64(&c.clean),
		"pending":   pending,
		"mapsize":   lhmap.Size(),
		"dirtysize": dirtymap.Size(),
	}).Info("List Merge counters")
func NewCounters() *counters {
	c := new(counters)
	c.ticker = time.NewTicker(time.Second)
	return c
}

var MIB, MAX_MEMORY uint64

func aggressivelyEvict(ms runtime.MemStats) {
	// Okay, we exceed the max memory threshold.
	// Stop the world, and deal with this first.
	stopTheWorld.Lock()
	defer stopTheWorld.Unlock()

	megs := ms.Alloc / MIB
	glog.WithField("allocated_MB", megs).
		Info("Memory usage over threshold. STOPPED THE WORLD!")

	glog.Info("Calling merge on all lists.")
	MergeLists(100 * runtime.GOMAXPROCS(-1))

	glog.Info("Merged lists. Calling GC.")
	runtime.GC() // Call GC to do some cleanup.
	glog.Info("Trying to free OS memory")
	debug.FreeOSMemory()

	runtime.ReadMemStats(&ms)
	megs = ms.Alloc / MIB
	glog.WithField("allocated_MB", megs).
		Info("Memory Usage after calling GC.")
}

func gentlyMerge(ms runtime.MemStats) {
	ctr := NewCounters()
	defer ctr.ticker.Stop()

	// Pick 400 keys from dirty map.
	var hs []gotomic.Hashable
	dirtymap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
		hs = append(hs, k)
	t := time.NewTicker(10 * time.Millisecond)
	defer t.Stop()
	for _ = range t.C {
		hid := hs[idx]
		idx += 1
		dirtymap.Delete(hid)

		ret, ok := lhmap.Get(hid)
		}
		// Not calling processOne, because we don't want to
		// remove the postings list from the map, to avoid
		// a race condition, where another caller re-creates the
		// posting list before a merge happens.
		l := ret.(*List)
		if l == nil {
			continue
		}
		mergeAndUpdate(l, ctr)
	}

	for _ = range time.Tick(5 * time.Second) {
		var ms runtime.MemStats
		runtime.ReadMemStats(&ms)
		if ms.Alloc > MAX_MEMORY {
			aggressivelyEvict(ms)
	}
}

var stopTheWorld sync.RWMutex
var lhmap *gotomic.Hash
var pstore *store.Store
var clog *commit.Logger
func Init(posting *store.Store, log *commit.Logger) {
func GetOrCreate(key []byte) *List {
	stopTheWorld.RLock()
	defer stopTheWorld.RUnlock()

	uid := farm.Fingerprint64(key)
	ukey := gotomic.IntKey(uid)
	lp, _ := lhmap.Get(ukey)
	if lp != nil {
		return lp.(*List)
	}

	l := NewList()
	if inserted := lhmap.PutIfMissing(ukey, l); inserted {
	} else {
		lp, _ = lhmap.Get(ukey)
		return lp.(*List)
func mergeAndUpdate(l *List, c *counters) {
	if l == nil {
		return
	}
	if merged, err := l.MergeIfDirty(); err != nil {
		glog.WithError(err).Error("While commiting dirty list.")
	} else if merged {
		atomic.AddUint64(&c.merged, 1)
	} else {
		atomic.AddUint64(&c.clean, 1)
	}
}

func processOne(k gotomic.Hashable, c *counters) {
	ret, _ := lhmap.Delete(k)
	l.SetForDeletion() // No more AddMutation.
// For on-demand merging of all lists.
func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) {
	// No need to go through dirtymap, because we're going through
	// everything right now anyways.
func queueAll(ch chan gotomic.Hashable, c *counters) {
	lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
		ch <- k
		atomic.AddUint64(&c.added, 1)
		return false // If this returns true, Each would break.
	})
	close(ch)
}

func MergeLists(numRoutines int) {
	// We're merging all the lists, so just create a new dirtymap.
	dirtymap = gotomic.NewHash()
	ch := make(chan gotomic.Hashable, 10000)
	go c.periodicLog()
	defer c.ticker.Stop()

	wg := new(sync.WaitGroup)
	for i := 0; i < numRoutines; i++ {
		wg.Add(1)