diff --git a/commit/log.go b/commit/log.go index 30583308b2912f0a772dd408d05da438a7fe4214..89b7964ecab01f4f356477e564ad82e0e6c17317 100644 --- a/commit/log.go +++ b/commit/log.go @@ -124,9 +124,6 @@ type Logger struct { // Sync every d duration. SyncDur time.Duration - // Skip write to commit log to allow for testing. - skipWrite int32 - sync.RWMutex list []*logFile cf *CurFile @@ -140,15 +137,6 @@ func (l *Logger) curFile() *CurFile { return l.cf } -func (l *Logger) SetSkipWrite(val bool) { - var v int32 - v = 0 - if val { - v = 1 - } - atomic.StoreInt32(&l.skipWrite, v) -} - func (l *Logger) updateLastLogTs(val int64) { for { prev := atomic.LoadInt64(&l.lastLogTs) @@ -434,9 +422,6 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { if ts < atomic.LoadInt64(&l.lastLogTs) { return fmt.Errorf("Timestamp lower than last log timestamp.") } - if atomic.LoadInt32(&l.skipWrite) == 1 { - return nil - } buf := new(bytes.Buffer) var err error @@ -525,11 +510,13 @@ func streamEntries(cache *Cache, type LogIterator func(record []byte) -// Always run this method in it's own goroutine. Otherwise, your program -// will just hang waiting on channels. func (l *Logger) StreamEntries(afterTs int64, hash uint32, iter LogIterator) error { + if atomic.LoadInt64(&l.lastLogTs) < afterTs { + return nil + } + var wg sync.WaitGroup l.RLock() for _, lf := range l.list { diff --git a/posting/list.go b/posting/list.go index a1b6a29a74dfbc9058bb4f32e0899d1041340014..9c162f4f8a6216e6efb165a6b3c0016342bef3dc 100644 --- a/posting/list.go +++ b/posting/list.go @@ -70,6 +70,7 @@ type List struct { mdelta int // len(plist) + mdelta = final length. maxMutationTs int64 // Track maximum mutation ts. mindex []*MutationLink + dirtyTs int64 // Use atomics for this. } func NewList() *List { @@ -242,6 +243,9 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { l.hash = farm.Fingerprint32(key) l.mlayer = make(map[int]types.Posting) + if clog == nil { + return + } glog.Debug("Starting stream entries...") err := clog.StreamEntries(posting.CommitTs()+1, l.hash, func(buffer []byte) { uo := flatbuffers.GetUOffsetT(buffer) @@ -638,31 +642,38 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { l.mergeMutation(mpost) l.maxMutationTs = t.Timestamp.UnixNano() + if len(l.mindex)+len(l.mlayer) > 0 { + atomic.StoreInt64(&l.dirtyTs, time.Now().UnixNano()) + if dirtyList != nil { + dirtyList.Push(l) + } + } + if l.clog == nil { + return nil + } return l.clog.AddLog(t.Timestamp.UnixNano(), l.hash, mbuf) } -func (l *List) IsDirty() bool { - // We can avoid checking for init here. - l.RLock() - defer l.RUnlock() - return len(l.mindex)+len(l.mlayer) > 0 -} - -func (l *List) MergeIfDirty() error { - if !l.IsDirty() { +func (l *List) MergeIfDirty() (merged bool, err error) { + if atomic.LoadInt64(&l.dirtyTs) == 0 { glog.WithField("dirty", false).Debug("Not Committing") - return nil + return false, nil } else { glog.WithField("dirty", true).Debug("Committing") } return l.merge() } -func (l *List) merge() error { +func (l *List) merge() (merged bool, rerr error) { l.wg.Wait() l.Lock() defer l.Unlock() + if len(l.mindex)+len(l.mlayer) == 0 { + atomic.StoreInt64(&l.dirtyTs, 0) + return false, nil + } + var p types.Posting sz := l.length() b := flatbuffers.NewBuilder(0) @@ -686,17 +697,18 @@ func (l *List) merge() error { b.Finish(end) if err := l.pstore.SetOne(l.key, b.Bytes[b.Head():]); err != nil { - glog.WithField("error", err).Errorf("While storing posting list") - return err + glog.WithField("error", err).Fatal("While storing posting list") + return true, err } // Now reset the mutation variables. atomic.StorePointer(&l.pbuffer, nil) // Make prev buffer eligible for GC. + atomic.StoreInt64(&l.dirtyTs, 0) // Set as clean. l.lastCompact = time.Now() l.mlayer = make(map[int]types.Posting) l.mdelta = 0 l.mindex = nil - return nil + return true, nil } func (l *List) LastCompactionTs() time.Time { diff --git a/posting/list_test.go b/posting/list_test.go index 7c608107a271ca3de0ddc160637891b0428416b4..a64fc8951c33822fb2b5ab814e7e65aaad512ef5 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -181,7 +181,7 @@ func TestAddMutation(t *testing.T) { t.Error(err) } - if err := dl.MergeIfDirty(); err != nil { + if _, err := dl.MergeIfDirty(); err != nil { t.Error(err) } if err := checkUids(t, dl, uids...); err != nil { @@ -234,7 +234,7 @@ func TestAddMutation_Value(t *testing.T) { } // Run the same check after committing. - if err := ol.MergeIfDirty(); err != nil { + if _, err := ol.MergeIfDirty(); err != nil { t.Error(err) } { diff --git a/posting/lists.go b/posting/lists.go index 99c9d433b52f6771a945e9dbfa3ba6a9d3849bb2..94cd49f827580b6a8ec894a84c1967aed2c3e7ee 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -18,7 +18,6 @@ package posting import ( "flag" - "math/rand" "runtime" "runtime/debug" "sync" @@ -32,13 +31,17 @@ import ( "github.com/zond/gotomic" ) -var maxmemory = flag.Uint64("threshold_ram_mb", 3072, +var minmemory = flag.Uint64("min_ram_mb", 2048, + "If RAM usage exceeds this, start periodically evicting posting lists"+ + " from memory.") +var maxmemory = flag.Uint64("max_ram_mb", 4096, "If RAM usage exceeds this, we stop the world, and flush our buffers.") type counters struct { ticker *time.Ticker added uint64 merged uint64 + clean uint64 } func (c *counters) periodicLog() { @@ -51,56 +54,100 @@ func (c *counters) periodicLog() { glog.WithFields(logrus.Fields{ "added": added, "merged": merged, + "clean": atomic.LoadUint64(&c.clean), "pending": pending, "mapsize": mapSize, }).Info("List Merge counters") } } -var MAX_MEMORY uint64 -var MIB uint64 +func NewCounters() *counters { + c := new(counters) + c.ticker = time.NewTicker(time.Second) + go c.periodicLog() + return c +} + +var MIB, MAX_MEMORY, MIN_MEMORY uint64 + +func aggressivelyEvict(ms runtime.MemStats) { + // Okay, we exceed the max memory threshold. + // Stop the world, and deal with this first. + stopTheWorld.Lock() + defer stopTheWorld.Unlock() + + megs := ms.Alloc / MIB + glog.WithField("allocated_MB", megs). + Info("Memory usage over threshold. STOPPED THE WORLD!") + + glog.Info("Calling merge on all lists.") + MergeLists(100 * runtime.GOMAXPROCS(-1)) + + glog.Info("Merged lists. Calling GC.") + runtime.GC() // Call GC to do some cleanup. + glog.Info("Trying to free OS memory") + debug.FreeOSMemory() + + runtime.ReadMemStats(&ms) + megs = ms.Alloc / MIB + glog.WithField("allocated_MB", megs). + Info("Memory Usage after calling GC.") +} + +func gentlyMerge(ms runtime.MemStats) { + ctr := NewCounters() + defer ctr.ticker.Stop() + + count := 0 + t := time.NewTicker(10 * time.Millisecond) + defer t.Stop() + for _ = range t.C { + count += 1 + if count > 400 { + break // We're doing 100 per second. So, stop after 4 seconds. + } + ret, ok := dirtyList.Pop() + if !ok || ret == nil { + break + } + // Not calling processOne, because we don't want to + // remove the postings list from the map, to avoid + // a race condition, where another caller re-creates the + // posting list before a merge happens. + l := ret.(*List) + if l == nil { + continue + } + mergeAndUpdate(l, ctr) + } +} func checkMemoryUsage() { MIB = 1 << 20 - MAX_MEMORY = *maxmemory * (1 << 20) + MAX_MEMORY = *maxmemory * MIB + MIN_MEMORY = *minmemory * MIB // Not being used right now. for _ = range time.Tick(5 * time.Second) { var ms runtime.MemStats runtime.ReadMemStats(&ms) - if ms.Alloc < MAX_MEMORY { - continue - } - - // Okay, we exceed the max memory threshold. - // Stop the world, and deal with this first. - stopTheWorld.Lock() - megs := ms.Alloc / MIB - glog.WithField("allocated_MB", megs). - Info("Memory usage over threshold. STOPPED THE WORLD!") + if ms.Alloc > MAX_MEMORY { + aggressivelyEvict(ms) - glog.Info("Calling merge on all lists.") - MergeLists(100 * runtime.GOMAXPROCS(-1)) - - glog.Info("Merged lists. Calling GC.") - runtime.GC() // Call GC to do some cleanup. - glog.Info("Trying to free OS memory") - debug.FreeOSMemory() - - runtime.ReadMemStats(&ms) - megs = ms.Alloc / MIB - glog.WithField("allocated_MB", megs). - Info("Memory Usage after calling GC.") - stopTheWorld.Unlock() + } else { + gentlyMerge(ms) + } } } var stopTheWorld sync.RWMutex var lhmap *gotomic.Hash +var dirtyList *gotomic.List var pstore *store.Store var clog *commit.Logger func Init(posting *store.Store, log *commit.Logger) { lhmap = gotomic.NewHash() + dirtyList = gotomic.NewList() pstore = posting clog = log go checkMemoryUsage() @@ -127,28 +174,48 @@ func GetOrCreate(key []byte) *List { } } +func mergeAndUpdate(l *List, c *counters) { + if l == nil { + return + } + if merged, err := l.MergeIfDirty(); err != nil { + glog.WithError(err).Error("While commiting dirty list.") + } else if merged { + atomic.AddUint64(&c.merged, 1) + } else { + atomic.AddUint64(&c.clean, 1) + } +} + func processOne(k gotomic.Hashable, c *counters) { ret, _ := lhmap.Delete(k) if ret == nil { return } l := ret.(*List) + if l == nil { return } - l.SetForDeletion() // No more AddMutation. - if err := l.MergeIfDirty(); err != nil { - glog.WithError(err).Error("While commiting dirty list.") - } - atomic.AddUint64(&c.merged, 1) + mergeAndUpdate(l, c) } // For on-demand merging of all lists. func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) { - for l := range ch { - processOne(l, c) + for dirtyList.Size() > 0 { + ret, ok := dirtyList.Pop() + if !ok || ret == nil { + continue + } + l := ret.(*List) + mergeAndUpdate(l, c) + } + + for k := range ch { + processOne(k, c) } + if wg != nil { wg.Done() } @@ -165,9 +232,7 @@ func queueAll(ch chan gotomic.Hashable, c *counters) { func MergeLists(numRoutines int) { ch := make(chan gotomic.Hashable, 10000) - c := new(counters) - c.ticker = time.NewTicker(time.Second) - go c.periodicLog() + c := NewCounters() go queueAll(ch, c) wg := new(sync.WaitGroup) @@ -178,55 +243,3 @@ func MergeLists(numRoutines int) { wg.Wait() c.ticker.Stop() } - -// For periodic merging of lists. -func queueRandomLists(ch chan gotomic.Hashable, c *counters) { - var buf []gotomic.Hashable - var count int - needed := cap(ch) - len(ch) - if needed < 100 { - return - } - - // Generate a random list of - lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool { - if count < needed { - buf = append(buf, k) - - } else { - j := rand.Intn(count) - if j < len(buf) { - buf[j] = k - } - } - count += 1 - return false - }) - - for _, k := range buf { - ch <- k - atomic.AddUint64(&c.added, 1) - } -} - -func periodicQueueForProcessing(ch chan gotomic.Hashable, c *counters) { - ticker := time.NewTicker(time.Minute) - for _ = range ticker.C { - queueRandomLists(ch, c) - } -} - -func periodicProcess(ch chan gotomic.Hashable, c *counters) { - ticker := time.NewTicker(100 * time.Millisecond) - for _ = range ticker.C { - hid := <-ch - processOne(hid, c) - } -} - -func StartPeriodicMerging() { - ctr := new(counters) - ch := make(chan gotomic.Hashable, 10000) - go periodicQueueForProcessing(ch, ctr) - go periodicProcess(ch, ctr) -} diff --git a/server/loader/main.go b/server/loader/main.go index 2390f484619a24d983841eb1980f141df45d8744..c0c46cc915a63fe82ef443acfd72fec8a05e0263 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -25,7 +25,6 @@ import ( "strings" "github.com/Sirupsen/logrus" - "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/loader" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" @@ -40,7 +39,6 @@ var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.") var numgo = flag.Int("numgo", 4, "Number of goroutines to use for reading file.") var postingDir = flag.String("postings", "", "Directory to store posting lists") -var mutationDir = flag.String("mutations", "", "Directory to store mutations") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") func main() { @@ -71,11 +69,7 @@ func main() { ps.Init(*postingDir) defer ps.Close() - clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20) - clog.SetSkipWrite(true) // Don't write to commit logs. - clog.Init() - defer clog.Close() - posting.Init(ps, clog) + posting.Init(ps, nil) files := strings.Split(*rdfGzips, ",") for _, path := range files { @@ -95,7 +89,7 @@ func main() { count, err := loader.HandleRdfReader(r, *mod) if err != nil { - glog.Fatal(err) + glog.WithError(err).Fatal("While handling rdf reader.") } glog.WithField("count", count).Info("RDFs parsed") r.Close() diff --git a/server/main.go b/server/main.go index 1fc3b65d3c7fc15da0308bd67ef611e4f6e56e10..1a258b76165f6399fbe4af37241f1f21a6006e51 100644 --- a/server/main.go +++ b/server/main.go @@ -107,13 +107,11 @@ func main() { defer ps.Close() clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20) - clog.SetSkipWrite(false) clog.SyncEvery = 1 clog.Init() defer clog.Close() posting.Init(ps, clog) - // posting.StartPeriodicMerging() http.HandleFunc("/query", queryHandler) glog.WithField("port", *port).Info("Listening for requests...")