Skip to content
Snippets Groups Projects
Commit baf842c9 authored by Manish R Jain's avatar Manish R Jain
Browse files

Switch dirtylist with dirtymap, because list causes a merge explosion, when...

Switch dirtylist with dirtymap, because list causes a merge explosion, when the same posting list gets multiple updates.
parent 0e423671
No related branches found
No related tags found
No related merge requests found
......@@ -35,6 +35,7 @@ import (
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-farm"
"github.com/google/flatbuffers/go"
"github.com/zond/gotomic"
)
var glog = x.Log("posting")
......@@ -57,6 +58,7 @@ type MutationLink struct {
type List struct {
sync.RWMutex
key []byte
ghash gotomic.Hashable
hash uint32
pbuffer unsafe.Pointer
pstore *store.Store // postinglist store
......@@ -241,6 +243,7 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
posting := l.getPostingList()
l.maxMutationTs = posting.CommitTs()
l.hash = farm.Fingerprint32(key)
l.ghash = gotomic.IntKey(farm.Fingerprint64(key))
l.mlayer = make(map[int]types.Posting)
if clog == nil {
......@@ -647,8 +650,8 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
l.maxMutationTs = t.Timestamp.UnixNano()
if len(l.mindex)+len(l.mlayer) > 0 {
atomic.StoreInt64(&l.dirtyTs, time.Now().UnixNano())
if dirtyList != nil {
dirtyList.Push(l)
if dirtymap != nil {
dirtymap.Put(l.ghash, true)
}
}
if l.clog == nil {
......
......@@ -46,17 +46,20 @@ type counters struct {
func (c *counters) periodicLog() {
for _ = range c.ticker.C {
mapSize := lhmap.Size()
added := atomic.LoadUint64(&c.added)
merged := atomic.LoadUint64(&c.merged)
pending := added - merged
var pending uint64
if added > merged {
pending = added - merged
}
glog.WithFields(logrus.Fields{
"added": added,
"merged": merged,
"clean": atomic.LoadUint64(&c.clean),
"pending": pending,
"mapsize": mapSize,
"added": added,
"merged": merged,
"clean": atomic.LoadUint64(&c.clean),
"pending": pending,
"mapsize": lhmap.Size(),
"dirtysize": dirtymap.Size(),
}).Info("List Merge counters")
}
}
......@@ -98,17 +101,27 @@ func gentlyMerge(ms runtime.MemStats) {
ctr := NewCounters()
defer ctr.ticker.Stop()
count := 0
// Pick 400 keys from dirty map.
var hs []gotomic.Hashable
dirtymap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
hs = append(hs, k)
return len(hs) > 400
})
idx := 0
t := time.NewTicker(10 * time.Millisecond)
defer t.Stop()
for _ = range t.C {
count += 1
if count > 400 {
break // We're doing 100 per second. So, stop after 4 seconds.
if idx >= len(hs) {
break
}
ret, ok := dirtyList.Pop()
hid := hs[idx]
idx += 1
dirtymap.Delete(hid)
ret, ok := lhmap.Get(hid)
if !ok || ret == nil {
break
continue
}
// Not calling processOne, because we don't want to
// remove the postings list from the map, to avoid
......@@ -128,6 +141,10 @@ func checkMemoryUsage() {
MIN_MEMORY = *minmemory * MIB // Not being used right now.
for _ = range time.Tick(5 * time.Second) {
if atomic.LoadInt32(&pauseCheckMemory) == 1 {
continue
}
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
if ms.Alloc > MAX_MEMORY {
......@@ -139,15 +156,16 @@ func checkMemoryUsage() {
}
}
var pauseCheckMemory int32
var stopTheWorld sync.RWMutex
var lhmap *gotomic.Hash
var dirtyList *gotomic.List
var dirtymap *gotomic.Hash
var pstore *store.Store
var clog *commit.Logger
func Init(posting *store.Store, log *commit.Logger) {
lhmap = gotomic.NewHash()
dirtyList = gotomic.NewList()
dirtymap = gotomic.NewHash()
pstore = posting
clog = log
go checkMemoryUsage()
......@@ -203,15 +221,8 @@ func processOne(k gotomic.Hashable, c *counters) {
// For on-demand merging of all lists.
func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) {
for dirtyList.Size() > 0 {
ret, ok := dirtyList.Pop()
if !ok || ret == nil {
continue
}
l := ret.(*List)
mergeAndUpdate(l, c)
}
// No need to go through dirtymap, because we're going through
// everything right now anyways.
for k := range ch {
processOne(k, c)
}
......@@ -231,6 +242,8 @@ func queueAll(ch chan gotomic.Hashable, c *counters) {
}
func MergeLists(numRoutines int) {
atomic.StoreInt32(&pauseCheckMemory, 1)
ch := make(chan gotomic.Hashable, 10000)
c := NewCounters()
go queueAll(ch, c)
......@@ -242,4 +255,5 @@ func MergeLists(numRoutines int) {
}
wg.Wait()
c.ticker.Stop()
atomic.StoreInt32(&pauseCheckMemory, 0)
}
......@@ -18,6 +18,8 @@ package rdf
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/dgraph-io/dgraph/lex"
......@@ -34,14 +36,21 @@ type NQuad struct {
Language string
}
func getUid(s string) (uint64, error) {
if strings.HasPrefix(s, "_uid_:") {
return strconv.ParseUint(s[6:], 0, 64)
}
return uid.GetOrAssign(s)
}
func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) {
sid, err := uid.GetOrAssign(nq.Subject)
sid, err := getUid(nq.Subject)
if err != nil {
return result, err
}
result.Entity = sid
if len(nq.ObjectId) > 0 {
oid, err := uid.GetOrAssign(nq.ObjectId)
oid, err := getUid(nq.ObjectId)
if err != nil {
return result, err
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment