From baf842c924ff6748e342c9cbd418798c8db23d3b Mon Sep 17 00:00:00 2001 From: Manish R Jain <manishrjain@gmail.com> Date: Wed, 25 Nov 2015 15:14:12 +1100 Subject: [PATCH] Switch dirtylist with dirtymap, because list causes a merge explosion, when the same posting list gets multiple updates. --- posting/list.go | 7 ++++-- posting/lists.go | 62 +++++++++++++++++++++++++++++------------------- rdf/parse.go | 13 ++++++++-- 3 files changed, 54 insertions(+), 28 deletions(-) diff --git a/posting/list.go b/posting/list.go index e9095a83..31c859b2 100644 --- a/posting/list.go +++ b/posting/list.go @@ -35,6 +35,7 @@ import ( "github.com/dgraph-io/dgraph/x" "github.com/dgryski/go-farm" "github.com/google/flatbuffers/go" + "github.com/zond/gotomic" ) var glog = x.Log("posting") @@ -57,6 +58,7 @@ type MutationLink struct { type List struct { sync.RWMutex key []byte + ghash gotomic.Hashable hash uint32 pbuffer unsafe.Pointer pstore *store.Store // postinglist store @@ -241,6 +243,7 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { posting := l.getPostingList() l.maxMutationTs = posting.CommitTs() l.hash = farm.Fingerprint32(key) + l.ghash = gotomic.IntKey(farm.Fingerprint64(key)) l.mlayer = make(map[int]types.Posting) if clog == nil { @@ -647,8 +650,8 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { l.maxMutationTs = t.Timestamp.UnixNano() if len(l.mindex)+len(l.mlayer) > 0 { atomic.StoreInt64(&l.dirtyTs, time.Now().UnixNano()) - if dirtyList != nil { - dirtyList.Push(l) + if dirtymap != nil { + dirtymap.Put(l.ghash, true) } } if l.clog == nil { diff --git a/posting/lists.go b/posting/lists.go index 94cd49f8..5d88c29a 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -46,17 +46,20 @@ type counters struct { func (c *counters) periodicLog() { for _ = range c.ticker.C { - mapSize := lhmap.Size() added := atomic.LoadUint64(&c.added) merged := atomic.LoadUint64(&c.merged) - pending := added - merged + var pending uint64 + if added > merged { + pending = added - merged + } glog.WithFields(logrus.Fields{ - "added": added, - "merged": merged, - "clean": atomic.LoadUint64(&c.clean), - "pending": pending, - "mapsize": mapSize, + "added": added, + "merged": merged, + "clean": atomic.LoadUint64(&c.clean), + "pending": pending, + "mapsize": lhmap.Size(), + "dirtysize": dirtymap.Size(), }).Info("List Merge counters") } } @@ -98,17 +101,27 @@ func gentlyMerge(ms runtime.MemStats) { ctr := NewCounters() defer ctr.ticker.Stop() - count := 0 + // Pick 400 keys from dirty map. + var hs []gotomic.Hashable + dirtymap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool { + hs = append(hs, k) + return len(hs) > 400 + }) + + idx := 0 t := time.NewTicker(10 * time.Millisecond) defer t.Stop() for _ = range t.C { - count += 1 - if count > 400 { - break // We're doing 100 per second. So, stop after 4 seconds. + if idx >= len(hs) { + break } - ret, ok := dirtyList.Pop() + hid := hs[idx] + idx += 1 + dirtymap.Delete(hid) + + ret, ok := lhmap.Get(hid) if !ok || ret == nil { - break + continue } // Not calling processOne, because we don't want to // remove the postings list from the map, to avoid @@ -128,6 +141,10 @@ func checkMemoryUsage() { MIN_MEMORY = *minmemory * MIB // Not being used right now. for _ = range time.Tick(5 * time.Second) { + if atomic.LoadInt32(&pauseCheckMemory) == 1 { + continue + } + var ms runtime.MemStats runtime.ReadMemStats(&ms) if ms.Alloc > MAX_MEMORY { @@ -139,15 +156,16 @@ func checkMemoryUsage() { } } +var pauseCheckMemory int32 var stopTheWorld sync.RWMutex var lhmap *gotomic.Hash -var dirtyList *gotomic.List +var dirtymap *gotomic.Hash var pstore *store.Store var clog *commit.Logger func Init(posting *store.Store, log *commit.Logger) { lhmap = gotomic.NewHash() - dirtyList = gotomic.NewList() + dirtymap = gotomic.NewHash() pstore = posting clog = log go checkMemoryUsage() @@ -203,15 +221,8 @@ func processOne(k gotomic.Hashable, c *counters) { // For on-demand merging of all lists. func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) { - for dirtyList.Size() > 0 { - ret, ok := dirtyList.Pop() - if !ok || ret == nil { - continue - } - l := ret.(*List) - mergeAndUpdate(l, c) - } - + // No need to go through dirtymap, because we're going through + // everything right now anyways. for k := range ch { processOne(k, c) } @@ -231,6 +242,8 @@ func queueAll(ch chan gotomic.Hashable, c *counters) { } func MergeLists(numRoutines int) { + atomic.StoreInt32(&pauseCheckMemory, 1) + ch := make(chan gotomic.Hashable, 10000) c := NewCounters() go queueAll(ch, c) @@ -242,4 +255,5 @@ func MergeLists(numRoutines int) { } wg.Wait() c.ticker.Stop() + atomic.StoreInt32(&pauseCheckMemory, 0) } diff --git a/rdf/parse.go b/rdf/parse.go index 5d4dbe9a..af27e4aa 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -18,6 +18,8 @@ package rdf import ( "fmt" + "strconv" + "strings" "time" "github.com/dgraph-io/dgraph/lex" @@ -34,14 +36,21 @@ type NQuad struct { Language string } +func getUid(s string) (uint64, error) { + if strings.HasPrefix(s, "_uid_:") { + return strconv.ParseUint(s[6:], 0, 64) + } + return uid.GetOrAssign(s) +} + func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { - sid, err := uid.GetOrAssign(nq.Subject) + sid, err := getUid(nq.Subject) if err != nil { return result, err } result.Entity = sid if len(nq.ObjectId) > 0 { - oid, err := uid.GetOrAssign(nq.ObjectId) + oid, err := getUid(nq.ObjectId) if err != nil { return result, err } -- GitLab