Skip to content
Snippets Groups Projects
lists.go 5.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • /*
     * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * 		http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package posting
    
    import (
    
    	"github.com/dgraph-io/dgraph/commit"
    
    	"github.com/dgraph-io/dgraph/store"
    
    	"github.com/dgryski/go-farm"
    
    var maxmemory = flag.Uint64("stw_ram_mb", 4096,
    
    	"If RAM usage exceeds this, we stop the world, and flush our buffers.")
    
    
    }
    
    func (c *counters) periodicLog() {
    
    func (c *counters) log() {
    	added := atomic.LoadUint64(&c.added)
    	merged := atomic.LoadUint64(&c.merged)
    	var pending uint64
    	if added > merged {
    		pending = added - merged
    
    
    	glog.WithFields(logrus.Fields{
    		"added":     added,
    		"merged":    merged,
    		"clean":     atomic.LoadUint64(&c.clean),
    		"pending":   pending,
    		"mapsize":   lhmap.Size(),
    		"dirtysize": dirtymap.Size(),
    	}).Info("List Merge counters")
    
    func NewCounters() *counters {
    	c := new(counters)
    	c.ticker = time.NewTicker(time.Second)
    	return c
    }
    
    
    var MIB, MAX_MEMORY uint64
    
    
    func aggressivelyEvict(ms runtime.MemStats) {
    	// Okay, we exceed the max memory threshold.
    	// Stop the world, and deal with this first.
    	stopTheWorld.Lock()
    	defer stopTheWorld.Unlock()
    
    	megs := ms.Alloc / MIB
    	glog.WithField("allocated_MB", megs).
    		Info("Memory usage over threshold. STOPPED THE WORLD!")
    
    	glog.Info("Calling merge on all lists.")
    	MergeLists(100 * runtime.GOMAXPROCS(-1))
    
    	glog.Info("Merged lists. Calling GC.")
    	runtime.GC() // Call GC to do some cleanup.
    	glog.Info("Trying to free OS memory")
    	debug.FreeOSMemory()
    
    	runtime.ReadMemStats(&ms)
    	megs = ms.Alloc / MIB
    	glog.WithField("allocated_MB", megs).
    		Info("Memory Usage after calling GC.")
    }
    
    func gentlyMerge(ms runtime.MemStats) {
    	ctr := NewCounters()
    	defer ctr.ticker.Stop()
    
    
    	// Pick 400 keys from dirty map.
    	var hs []gotomic.Hashable
    	dirtymap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
    		hs = append(hs, k)
    
    	t := time.NewTicker(10 * time.Millisecond)
    	defer t.Stop()
    	for _ = range t.C {
    
    		hid := hs[idx]
    		idx += 1
    		dirtymap.Delete(hid)
    
    		ret, ok := lhmap.Get(hid)
    
    		}
    		// Not calling processOne, because we don't want to
    		// remove the postings list from the map, to avoid
    		// a race condition, where another caller re-creates the
    		// posting list before a merge happens.
    		l := ret.(*List)
    		if l == nil {
    			continue
    		}
    		mergeAndUpdate(l, ctr)
    	}
    
    
    	for _ = range time.Tick(5 * time.Second) {
    		var ms runtime.MemStats
    		runtime.ReadMemStats(&ms)
    
    		if ms.Alloc > MAX_MEMORY {
    			aggressivelyEvict(ms)
    
    	}
    }
    
    var stopTheWorld sync.RWMutex
    var lhmap *gotomic.Hash
    
    var clog *commit.Logger
    
    Ashwin's avatar
    Ashwin committed
    func Init(log *commit.Logger) {
    
    Ashwin's avatar
    Ashwin committed
    func GetOrCreate(key []byte, pstore *store.Store) *List {
    
    	stopTheWorld.RLock()
    	defer stopTheWorld.RUnlock()
    
    
    	uid := farm.Fingerprint64(key)
    
    	ukey := gotomic.IntKey(uid)
    	lp, _ := lhmap.Get(ukey)
    	if lp != nil {
    		return lp.(*List)
    	}
    
    	l := NewList()
    	if inserted := lhmap.PutIfMissing(ukey, l); inserted {
    
    	} else {
    		lp, _ = lhmap.Get(ukey)
    		return lp.(*List)
    
    func mergeAndUpdate(l *List, c *counters) {
    	if l == nil {
    		return
    	}
    	if merged, err := l.MergeIfDirty(); err != nil {
    		glog.WithError(err).Error("While commiting dirty list.")
    	} else if merged {
    		atomic.AddUint64(&c.merged, 1)
    	} else {
    		atomic.AddUint64(&c.clean, 1)
    	}
    }
    
    
    func processOne(k gotomic.Hashable, c *counters) {
    	ret, _ := lhmap.Delete(k)
    
    	l.SetForDeletion() // No more AddMutation.
    
    // For on-demand merging of all lists.
    func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) {
    
    	// No need to go through dirtymap, because we're going through
    	// everything right now anyways.
    
    func queueAll(ch chan gotomic.Hashable, c *counters) {
    	lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
    		ch <- k
    		atomic.AddUint64(&c.added, 1)
    		return false // If this returns true, Each would break.
    	})
    
    	close(ch)
    }
    
    func MergeLists(numRoutines int) {
    
    	// We're merging all the lists, so just create a new dirtymap.
    	dirtymap = gotomic.NewHash()
    
    	ch := make(chan gotomic.Hashable, 10000)
    
    	go c.periodicLog()
    	defer c.ticker.Stop()
    
    
    	wg := new(sync.WaitGroup)
    	for i := 0; i < numRoutines; i++ {
    		wg.Add(1)