From 14548ae4c4118bceb76648a3a680a2b129b8da10 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Tue, 10 Jul 2018 17:29:45 -0700
Subject: [PATCH] Work so far on consolidating rolling up of posting lists.

---
 posting/doc.go         |  16 +++
 posting/index.go       |   1 +
 posting/list.go        | 229 ++++++++++++++++-------------------------
 posting/lists.go       |  73 +++++++------
 posting/lru.go         |  17 +--
 posting/mvcc.go        |  62 +----------
 posting/rollup.go      | 184 +++++++++++++++++++++++++++++++++
 worker/draft.go        |   6 +-
 worker/predicate.go    |   2 +-
 worker/rollup.go       |  81 +++++++++++++++
 worker/stream_lists.go |   4 +-
 11 files changed, 423 insertions(+), 252 deletions(-)
 create mode 100644 posting/rollup.go
 create mode 100644 worker/rollup.go

diff --git a/posting/doc.go b/posting/doc.go
index 1887a3cb..812691a2 100644
--- a/posting/doc.go
+++ b/posting/doc.go
@@ -7,4 +7,20 @@
 
 // Package posting takes care of posting lists. It contains logic for mutation
 // layers, merging them with BadgerDB, etc.
+//
+// Q. How should we do posting list rollups?
+// Posting lists are stored such that we have a immutable state, and mutations on top of it. In
+// Dgraph, we only store committed mutations to disk, uncommitted mutations are just kept in memory.
+// Thus, we have the immutable state + mutations as different versions in Badger.
+// Periodically, we roll up the mutations into the immutable state, to make reads faster. The
+// question is when should we do these rollups.
+//
+// We can do rollups when snapshots happen. At that time, we can first iterate over all the posting
+// lists in memory, and call rollup(snapshotTs) on them. Then, we can do key-only iteration over
+// Badger, and call rollup(snapshotTs) on any posting list which doesn't have full state as the
+// first version.
+//
+// Any request for snapshot can then be served directly by only reading entries until snapshotTs.
+// This would be simple to understand, and consistent within the entire Raft group.
+// We can also get rid of SyncIfDirty calls, because all commits are already stored on disk.
 package posting
diff --git a/posting/index.go b/posting/index.go
index 99e5031e..de4f5f6f 100644
--- a/posting/index.go
+++ b/posting/index.go
@@ -862,6 +862,7 @@ func RebuildIndex(ctx context.Context, attr string, startTs uint64) error {
 			txn := &Txn{StartTs: startTs}
 			for it := range ch {
 				addPostingsToIndex(it.uid, it.list, txn)
+				// NOTE: This has to be in memory for performance.
 				err = txn.CommitMutationsMemory(ctx, txn.StartTs)
 				if err != nil {
 					txn.AbortMutations(ctx)
diff --git a/posting/list.go b/posting/list.go
index 24e202e1..6bac28cb 100644
--- a/posting/list.go
+++ b/posting/list.go
@@ -53,7 +53,7 @@ const (
 
 	// Metadata Bit which is stored to find out whether the stored value is pl or byte slice.
 	BitUidPosting      byte = 0x01
-	bitDeltaPosting    byte = 0x04
+	BitDeltaPosting    byte = 0x04
 	BitCompletePosting byte = 0x08
 	BitEmptyPosting    byte = 0x10 | BitCompletePosting
 )
@@ -414,13 +414,13 @@ func (l *List) commitMutation(ctx context.Context, startTs, commitTs uint64) err
 	delete(l.activeTxns, startTs)
 
 	// Calculate 5% of immutable layer
-	numUids := (bp128.NumIntegers(l.plist.Uids) * 5) / 100
-	if numUids < 1000 {
-		numUids = 1000
-	}
-	if l.numCommits > numUids {
-		l.syncIfDirty(false)
-	}
+	// numUids := (bp128.NumIntegers(l.plist.Uids) * 5) / 100
+	// if numUids < 1000 {
+	// 	numUids = 1000
+	// }
+	// if l.numCommits > numUids {
+	// 	l.syncIfDirty(false)
+	// }
 	return nil
 }
 
@@ -618,24 +618,25 @@ func doAsyncWrite(commitTs uint64, key []byte, data []byte, meta byte, f func(er
 	}
 }
 
+// TODO: Remove MarshalToKV.
 func (l *List) MarshalToKv() (*intern.KV, error) {
 	l.Lock()
 	defer l.Unlock()
 	x.AssertTrue(len(l.activeTxns) == 0)
-	if err := l.rollup(); err != nil {
+	if _, err := l.Rollup(); err != nil {
 		return nil, err
 	}
 
 	kv := &intern.KV{}
 	kv.Version = l.minTs
 	kv.Key = l.key
-	val, meta := marshalPostingList(l.plist)
+	val, meta := MarshalPostingList(l.plist)
 	kv.UserMeta = []byte{meta}
 	kv.Val = val
 	return kv, nil
 }
 
-func marshalPostingList(plist *intern.PostingList) (data []byte, meta byte) {
+func MarshalPostingList(plist *intern.PostingList) (data []byte, meta byte) {
 	if len(plist.Uids) == 0 {
 		data = nil
 		meta = meta | BitEmptyPosting
@@ -651,137 +652,81 @@ func marshalPostingList(plist *intern.PostingList) (data []byte, meta byte) {
 	return
 }
 
-// Merge all entries in mutation layer with commitTs <= l.commitTs
-// into immutable layer.
-func (l *List) rollup() error {
-	l.AssertLock()
-	final := new(intern.PostingList)
-	var bp bp128.BPackEncoder
-	buf := make([]uint64, 0, bp128.BlockSize)
-
-	// Pick all committed entries
-	x.AssertTrue(l.minTs <= l.commitTs)
-	err := l.iterate(l.commitTs, 0, func(p *intern.Posting) bool {
-		// iterate already takes care of not returning entries whose commitTs is above l.commitTs.
-		// So, we don't need to do any filtering here. In fact, doing filtering here could result
-		// in a bug.
-		buf = append(buf, p.Uid)
-		if len(buf) == bp128.BlockSize {
-			bp.PackAppend(buf)
-			buf = buf[:0]
-		}
-
-		// We want to add the posting if it has facets or has a value.
-		if p.Facets != nil || p.PostingType != intern.Posting_REF || len(p.Label) != 0 {
-			// I think it's okay to take the pointer from the iterator, because we have a lock
-			// over List; which won't be released until final has been marshalled. Thus, the
-			// underlying data wouldn't be changed.
-			final.Postings = append(final.Postings, p)
-		}
-		return true
-	})
-	x.Check(err)
-	if len(buf) > 0 {
-		bp.PackAppend(buf)
-	}
-	sz := bp.Size()
-	if sz > 0 {
-		final.Uids = make([]byte, sz)
-		// TODO: Add bytes method
-		bp.WriteTo(final.Uids)
-	}
-	// Keep all uncommited Entries or postings with commitTs > l.commitTs
-	// in mutation map. Discard all else.
-	for startTs, plist := range l.mutationMap {
-		cl := plist.Commit
-		if cl == 0 || cl > l.commitTs {
-			// Keep this.
-		} else {
-			delete(l.mutationMap, startTs)
-		}
-	}
-
-	l.minTs = l.commitTs
-	l.plist = final
-	l.numCommits = 0
-	atomic.StoreInt32(&l.estimatedSize, l.calculateSize())
-	return nil
-}
-
-func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) {
-	l.Lock()
-	defer l.Unlock()
-	return l.syncIfDirty(delFromCache)
-}
-
-// Merge mutation layer and immutable layer.
-func (l *List) syncIfDirty(delFromCache bool) (committed bool, err error) {
-	// We no longer set posting list to empty.
-	if len(l.mutationMap) == 0 {
-		return false, nil
-	}
-	if delFromCache {
-		// Don't evict if there is pending transaction.
-		x.AssertTrue(len(l.activeTxns) == 0)
-	}
-
-	lmlayer := len(l.mutationMap)
-	// Merge all entries in mutation layer with commitTs <= l.commitTs
-	// into immutable layer.
-	if err := l.rollup(); err != nil {
-		return false, err
-	}
-	// Check if length of mutationMap has changed after rollup, else skip writing to disk.
-	if len(l.mutationMap) == lmlayer {
-		// There was no change in immutable layer.
-		return false, nil
-	}
-	x.AssertTrue(l.minTs > 0)
-
-	data, meta := marshalPostingList(l.plist)
-	for {
-		pLen := atomic.LoadInt64(&x.MaxPlSz)
-		if int64(len(data)) <= pLen {
-			break
-		}
-		if atomic.CompareAndSwapInt64(&x.MaxPlSz, pLen, int64(len(data))) {
-			x.MaxPlSize.Set(int64(len(data)))
-			x.MaxPlLength.Set(int64(bp128.NumIntegers(l.plist.Uids)))
-			break
-		}
-	}
-
-	// Copy this over because minTs can change by the time callback returns.
-	minTs := l.minTs
-	retries := 0
-	var f func(error)
-	f = func(err error) {
-		if err != nil {
-			x.Printf("Got err in while doing async writes in SyncIfDirty: %+v", err)
-			if retries > 5 {
-				x.Fatalf("Max retries exceeded while doing async write for key: %s, err: %+v",
-					l.key, err)
-			}
-			// Error from badger should be temporary, so we can retry.
-			retries += 1
-			doAsyncWrite(minTs, l.key, data, meta, f)
-			return
-		}
-		if atomic.LoadInt32(&l.onDisk) == 0 {
-			btree.Delete(l.key)
-			atomic.StoreInt32(&l.onDisk, 1)
-		}
-		x.BytesWrite.Add(int64(len(data)))
-		x.PostingWrites.Add(1)
-		if delFromCache {
-			x.AssertTrue(atomic.LoadInt32(&l.deleteMe) == 1)
-			lcache.delete(l.key)
-		}
-	}
-
-	doAsyncWrite(minTs, l.key, data, meta, f)
-	return true, nil
-}
+// func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) {
+// 	l.Lock()
+// 	defer l.Unlock()
+// 	return l.syncIfDirty(delFromCache)
+// }
+
+// // TODO: Remove SyncIfDirty.
+// // Merge mutation layer and immutable layer.
+// func (l *List) syncIfDirty(delFromCache bool) (committed bool, err error) {
+// 	// We no longer set posting list to empty.
+// 	if len(l.mutationMap) == 0 {
+// 		return false, nil
+// 	}
+// 	if delFromCache {
+// 		// Don't evict if there is pending transaction.
+// 		x.AssertTrue(len(l.activeTxns) == 0)
+// 	}
+
+// 	lmlayer := len(l.mutationMap)
+// 	// Merge all entries in mutation layer with commitTs <= l.commitTs
+// 	// into immutable layer.
+// 	if _, err := l.Rollup(); err != nil {
+// 		return false, err
+// 	}
+// 	// Check if length of mutationMap has changed after rollup, else skip writing to disk.
+// 	if len(l.mutationMap) == lmlayer {
+// 		// There was no change in immutable layer.
+// 		return false, nil
+// 	}
+// 	x.AssertTrue(l.minTs > 0)
+
+// 	data, meta := MarshalPostingList(l.plist)
+// 	for {
+// 		pLen := atomic.LoadInt64(&x.MaxPlSz)
+// 		if int64(len(data)) <= pLen {
+// 			break
+// 		}
+// 		if atomic.CompareAndSwapInt64(&x.MaxPlSz, pLen, int64(len(data))) {
+// 			x.MaxPlSize.Set(int64(len(data)))
+// 			x.MaxPlLength.Set(int64(bp128.NumIntegers(l.plist.Uids)))
+// 			break
+// 		}
+// 	}
+
+// 	// Copy this over because minTs can change by the time callback returns.
+// 	minTs := l.minTs
+// 	retries := 0
+// 	var f func(error)
+// 	f = func(err error) {
+// 		if err != nil {
+// 			x.Printf("Got err in while doing async writes in SyncIfDirty: %+v", err)
+// 			if retries > 5 {
+// 				x.Fatalf("Max retries exceeded while doing async write for key: %s, err: %+v",
+// 					l.key, err)
+// 			}
+// 			// Error from badger should be temporary, so we can retry.
+// 			retries += 1
+// 			doAsyncWrite(minTs, l.key, data, meta, f)
+// 			return
+// 		}
+// 		if atomic.LoadInt32(&l.onDisk) == 0 {
+// 			btree.Delete(l.key)
+// 			atomic.StoreInt32(&l.onDisk, 1)
+// 		}
+// 		x.BytesWrite.Add(int64(len(data)))
+// 		x.PostingWrites.Add(1)
+// 		if delFromCache {
+// 			x.AssertTrue(atomic.LoadInt32(&l.deleteMe) == 1)
+// 			lcache.delete(l.key)
+// 		}
+// 	}
+
+// 	doAsyncWrite(minTs, l.key, data, meta, f)
+// 	return true, nil
+// }
 
 // Copies the val if it's uid only posting, be careful
 func UnmarshalOrCopy(val []byte, metadata byte, pl *intern.PostingList) {
diff --git a/posting/lists.go b/posting/lists.go
index d8ea7100..e8430823 100644
--- a/posting/lists.go
+++ b/posting/lists.go
@@ -17,7 +17,6 @@ import (
 	"runtime"
 	"strconv"
 	"strings"
-	"sync"
 	"sync/atomic"
 	"time"
 
@@ -274,39 +273,39 @@ func EvictLRU() {
 	lcache.Reset()
 }
 
-func CommitLists(commit func(key []byte) bool) {
-	// We iterate over lru and pushing values (List) into this
-	// channel. Then goroutines right below will commit these lists to data store.
-	workChan := make(chan *List, 10000)
-
-	var wg sync.WaitGroup
-	for i := 0; i < 10; i++ {
-		wg.Add(1)
-		go func() {
-			defer wg.Done()
-			for l := range workChan {
-				l.SyncIfDirty(false)
-			}
-		}()
-	}
-
-	lcache.iterate(func(l *List) bool {
-		if commit(l.key) {
-			workChan <- l
-		}
-		return true
-	})
-	close(workChan)
-	wg.Wait()
-
-	// The following hack ensures that all the asynchrously run commits above would have been done
-	// before this completes. Badger now actually gets rid of keys, which are deleted. So, we can
-	// use the Delete function.
-	txn := pstore.NewTransactionAt(1, true)
-	defer txn.Discard()
-	x.Check(txn.Delete(x.DataKey("_dummy_", 1)))
-	// Nothing is being read, so there can't be an ErrConflict. This should go to disk.
-	if err := txn.CommitAt(1, nil); err != nil {
-		x.Printf("Commit unexpectedly failed with error: %v", err)
-	}
-}
+// func CommitLists(commit func(key []byte) bool) {
+// 	// We iterate over lru and pushing values (List) into this
+// 	// channel. Then goroutines right below will commit these lists to data store.
+// 	workChan := make(chan *List, 10000)
+
+// 	var wg sync.WaitGroup
+// 	for i := 0; i < 10; i++ {
+// 		wg.Add(1)
+// 		go func() {
+// 			defer wg.Done()
+// 			for l := range workChan {
+// 				l.SyncIfDirty(false)
+// 			}
+// 		}()
+// 	}
+
+// 	lcache.iterate(func(l *List) bool {
+// 		if commit(l.key) {
+// 			workChan <- l
+// 		}
+// 		return true
+// 	})
+// 	close(workChan)
+// 	wg.Wait()
+
+// 	// The following hack ensures that all the asynchrously run commits above would have been done
+// 	// before this completes. Badger now actually gets rid of keys, which are deleted. So, we can
+// 	// use the Delete function.
+// 	txn := pstore.NewTransactionAt(1, true)
+// 	defer txn.Discard()
+// 	x.Check(txn.Delete(x.DataKey("_dummy_", 1)))
+// 	// Nothing is being read, so there can't be an ErrConflict. This should go to disk.
+// 	if err := txn.CommitAt(1, nil); err != nil {
+// 		x.Printf("Commit unexpectedly failed with error: %v", err)
+// 	}
+// }
diff --git a/posting/lru.go b/posting/lru.go
index a7133bac..671d863a 100644
--- a/posting/lru.go
+++ b/posting/lru.go
@@ -136,14 +136,15 @@ func (c *listCache) removeOldest() {
 			ele = ele.Prev()
 			continue
 		}
-		// If length of mutation layer is zero, then we won't call pstore.SetAsync and the
-		// key wont be deleted from cache. So lets delete it now if SyncIfDirty returns false.
-		if committed, err := e.pl.SyncIfDirty(true); err != nil {
-			ele = ele.Prev()
-			continue
-		} else if !committed {
-			delete(c.cache, e.key)
-		}
+		delete(c.cache, e.key)
+		// // If length of mutation layer is zero, then we won't call pstore.SetAsync and the
+		// // key wont be deleted from cache. So lets delete it now if SyncIfDirty returns false.
+		// if committed, err := e.pl.SyncIfDirty(true); err != nil {
+		// 	ele = ele.Prev()
+		// 	continue
+		// } else if !committed {
+		// delete(c.cache, e.key)
+		// }
 
 		// ele gets Reset once it's passed to Remove, so store the prev.
 		prev := ele.Prev()
diff --git a/posting/mvcc.go b/posting/mvcc.go
index 326f1669..c509767f 100644
--- a/posting/mvcc.go
+++ b/posting/mvcc.go
@@ -119,7 +119,7 @@ func (tx *Txn) CommitMutations(ctx context.Context, commitTs uint64) error {
 				copy(pl.Postings[midx+1:], pl.Postings[midx:])
 				pl.Postings[midx] = d.posting
 			}
-			meta = bitDeltaPosting
+			meta = BitDeltaPosting
 		}
 
 		// delta postings are pointers to the postings present in the Pl present in lru.
@@ -221,66 +221,6 @@ func unmarshalOrCopy(plist *intern.PostingList, item *badger.Item) error {
 	return nil
 }
 
-// constructs the posting list from the disk using the passed iterator.
-// Use forward iterator with allversions enabled in iter options.
-//
-// key would now be owned by the posting list. So, ensure that it isn't reused
-// elsewhere.
-func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
-	l := new(List)
-	l.key = key
-	l.mutationMap = make(map[uint64]*intern.PostingList)
-	l.activeTxns = make(map[uint64]struct{})
-	l.plist = new(intern.PostingList)
-
-	// Iterates from highest Ts to lowest Ts
-	for it.Valid() {
-		item := it.Item()
-		if item.IsDeletedOrExpired() {
-			// Don't consider any more versions.
-			break
-		}
-		if !bytes.Equal(item.Key(), l.key) {
-			break
-		}
-		if l.commitTs == 0 {
-			l.commitTs = item.Version()
-		}
-
-		val, err := item.Value()
-		if err != nil {
-			return nil, err
-		}
-		if item.UserMeta()&BitCompletePosting > 0 {
-			if err := unmarshalOrCopy(l.plist, item); err != nil {
-				return nil, err
-			}
-			l.minTs = item.Version()
-			// No need to do Next here. The outer loop can take care of skipping more versions of
-			// the same key.
-			break
-		}
-		if item.UserMeta()&bitDeltaPosting > 0 {
-			pl := &intern.PostingList{}
-			x.Check(pl.Unmarshal(val))
-			pl.Commit = item.Version()
-			for _, mpost := range pl.Postings {
-				// commitTs, startTs are meant to be only in memory, not
-				// stored on disk.
-				mpost.CommitTs = item.Version()
-			}
-			l.mutationMap[pl.Commit] = pl
-		} else {
-			x.Fatalf("unexpected meta: %d", item.UserMeta())
-		}
-		if item.DiscardEarlierVersions() {
-			break
-		}
-		it.Next()
-	}
-	return l, nil
-}
-
 func getNew(key []byte, pstore *badger.ManagedDB) (*List, error) {
 	l := new(List)
 	l.key = key
diff --git a/posting/rollup.go b/posting/rollup.go
new file mode 100644
index 00000000..af3a65f0
--- /dev/null
+++ b/posting/rollup.go
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2018 Dgraph Labs, Inc.
+ *
+ * This file is available under the Apache License, Version 2.0,
+ * with the Commons Clause restriction.
+ */
+
+package posting
+
+import (
+	"bytes"
+	"math"
+
+	"github.com/dgraph-io/badger"
+	"github.com/dgraph-io/dgraph/bp128"
+	"github.com/dgraph-io/dgraph/protos/intern"
+	"github.com/dgraph-io/dgraph/x"
+)
+
+// constructs the posting list from the disk using the passed iterator.
+// Use forward iterator with allversions enabled in iter options.
+//
+// key would now be owned by the posting list. So, ensure that it isn't reused
+// elsewhere.
+func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
+	l := new(List)
+	l.key = key
+	l.mutationMap = make(map[uint64]*intern.PostingList)
+	l.activeTxns = make(map[uint64]struct{})
+	l.plist = new(intern.PostingList)
+
+	// Iterates from highest Ts to lowest Ts
+	for it.Valid() {
+		item := it.Item()
+		if item.IsDeletedOrExpired() {
+			// Don't consider any more versions.
+			break
+		}
+		if !bytes.Equal(item.Key(), l.key) {
+			break
+		}
+		if l.commitTs == 0 {
+			l.commitTs = item.Version()
+		}
+
+		val, err := item.Value()
+		if err != nil {
+			return nil, err
+		}
+		if item.UserMeta()&BitCompletePosting > 0 {
+			if err := unmarshalOrCopy(l.plist, item); err != nil {
+				return nil, err
+			}
+			l.minTs = item.Version()
+			// No need to do Next here. The outer loop can take care of skipping more versions of
+			// the same key.
+			break
+		}
+		if item.UserMeta()&BitDeltaPosting > 0 {
+			pl := &intern.PostingList{}
+			x.Check(pl.Unmarshal(val))
+			pl.Commit = item.Version()
+			for _, mpost := range pl.Postings {
+				// commitTs, startTs are meant to be only in memory, not
+				// stored on disk.
+				mpost.CommitTs = item.Version()
+			}
+			l.mutationMap[pl.Commit] = pl
+		} else {
+			x.Fatalf("unexpected meta: %d", item.UserMeta())
+		}
+		if item.DiscardEarlierVersions() {
+			break
+		}
+		it.Next()
+	}
+	return l, nil
+}
+
+// Generate a new PostingList by merging the immutable layer with the deltas.
+func (l *List) Rollup() (*intern.PostingList, error) {
+	l.RLock()
+	defer l.RUnlock()
+	// l.AssertLock()
+	final := new(intern.PostingList)
+	var bp bp128.BPackEncoder
+	buf := make([]uint64, 0, bp128.BlockSize)
+
+	maxCommitTs := l.commitTs
+	// Pick all committed entries
+	// TODO: Do we need this assert here?
+	// x.AssertTrue(l.minTs <= l.commitTs)
+	err := l.iterate(math.MaxUint64, 0, func(p *intern.Posting) bool {
+		// iterate already takes care of not returning entries whose commitTs is above l.commitTs.
+		// So, we don't need to do any filtering here. In fact, doing filtering here could result
+		// in a bug.
+		buf = append(buf, p.Uid)
+		if len(buf) == bp128.BlockSize {
+			bp.PackAppend(buf)
+			buf = buf[:0]
+		}
+
+		// We want to add the posting if it has facets or has a value.
+		if p.Facets != nil || p.PostingType != intern.Posting_REF || len(p.Label) != 0 {
+			// I think it's okay to take the pointer from the iterator, because we have a lock
+			// over List; which won't be released until final has been marshalled. Thus, the
+			// underlying data wouldn't be changed.
+			final.Postings = append(final.Postings, p)
+		}
+		maxCommitTs = x.Max(maxCommitTs, p.CommitTs)
+		return true
+	})
+	if err != nil {
+		return nil, err
+	}
+	if len(buf) > 0 {
+		bp.PackAppend(buf)
+	}
+	if sz := bp.Size(); sz > 0 {
+		final.Uids = make([]byte, sz)
+		// TODO: Add bytes method
+		// What does this TODO above mean?
+		bp.WriteTo(final.Uids)
+	}
+	final.Commit = maxCommitTs
+	return final, nil
+	// // Keep all uncommited Entries or postings with commitTs > l.commitTs
+	// // in mutation map. Discard all else.
+	// for startTs, plist := range l.mutationMap {
+	// 	cl := plist.Commit
+	// 	if cl == 0 || cl > l.commitTs {
+	// 		// Keep this.
+	// 	} else {
+	// 		delete(l.mutationMap, startTs)
+	// 	}
+	// }
+
+	// l.minTs = l.commitTs
+	// l.plist = final
+	// l.numCommits = 0
+	// atomic.StoreInt32(&l.estimatedSize, l.calculateSize())
+	// return nil
+}
+
+func scanKeysToRoll(snapshotTs uint64, keyChan chan *intern.KVS) error {
+	txn := pstore.NewTransactionAt(snapshotTs, false)
+	defer txn.Discard()
+
+	opts := badger.DefaultIteratorOptions
+	opts.AllVersions = false
+	opts.PrefetchValues = false
+
+	itr := txn.NewIterator(opts)
+	defer itr.Close()
+
+	kvs := new(intern.KVS)
+	// We just pick up the first version of each key. If it is already complete, we skip.
+	// If it is a delta, we add that to KVS, to be rolled up.
+	for itr.Rewind(); itr.Valid(); itr.Next() {
+		item := itr.Item()
+		pk := x.Parse(item.Key())
+		if pk.IsSchema() {
+			// Skip schema.
+			continue
+		}
+		if item.UserMeta()&BitCompletePosting > 0 {
+			// First version is complete. Therefore, skip.
+
+		} else if item.UserMeta()&BitDeltaPosting > 0 {
+			kvs.Kv = append(kvs.Kv, &intern.KV{Key: item.KeyCopy(nil)})
+			if len(kvs.Kv) >= 100 {
+				keyChan <- kvs
+				kvs = new(intern.KVS)
+			}
+
+		} else {
+			x.Fatalf("unexpected meta: %d", item.UserMeta())
+		}
+	}
+	if len(kvs.Kv) > 0 {
+		keyChan <- kvs
+	}
+	return nil
+}
diff --git a/worker/draft.go b/worker/draft.go
index 2c8fa9bb..8b653a9f 100644
--- a/worker/draft.go
+++ b/worker/draft.go
@@ -389,7 +389,11 @@ func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error {
 			snap.Index, snap.MinPendingStartTs)
 		data, err := snap.Marshal()
 		x.Check(err)
-		return n.Store.CreateSnapshot(snap.Index, n.ConfState(), data)
+		if err := n.Store.CreateSnapshot(snap.Index, n.ConfState(), data); err != nil {
+			return err
+		}
+		// Now roll up all posting lists.
+		return rollupPostingLists(snap.MinPendingStartTs - 1)
 
 	} else {
 		x.Fatalf("Unknown proposal")
diff --git a/worker/predicate.go b/worker/predicate.go
index e67541f8..f589f918 100644
--- a/worker/predicate.go
+++ b/worker/predicate.go
@@ -233,7 +233,7 @@ func (w *grpcWorker) PredicateAndSchemaData(m *intern.SnapshotMeta, stream inter
 	}
 
 	sl := streamLists{stream: stream, db: pstore}
-	sl.chooseKey = func(key []byte, version uint64) bool {
+	sl.chooseKey = func(key []byte, version uint64, _ byte) bool {
 		pk := x.Parse(key)
 		return version > clientTs || pk.IsSchema()
 	}
diff --git a/worker/rollup.go b/worker/rollup.go
new file mode 100644
index 00000000..24d7379c
--- /dev/null
+++ b/worker/rollup.go
@@ -0,0 +1,81 @@
+package worker
+
+import (
+	"sync"
+
+	"github.com/dgraph-io/badger"
+	"github.com/dgraph-io/dgraph/posting"
+	"github.com/dgraph-io/dgraph/protos/intern"
+	"github.com/dgraph-io/dgraph/x"
+	"golang.org/x/net/context"
+)
+
+// We use the same Send interface, but instead write back to DB.
+type writeToStore struct{}
+
+func (ws *writeToStore) Send(kvs *intern.KVS) error {
+	var wg sync.WaitGroup
+	errCh := make(chan error, 1)
+	var count int
+	for _, kv := range kvs.GetKv() {
+		if kv.Version == 0 {
+			continue
+		}
+		txn := pstore.NewTransactionAt(kv.Version, true)
+		x.Check(txn.SetWithDiscard(kv.Key, kv.Val, kv.UserMeta[0]))
+		wg.Add(1)
+		err := txn.CommitAt(kv.Version, func(err error) {
+			defer wg.Done()
+			if err != nil {
+				x.Printf("Error while writing list to Badger: %v\n", err)
+				select {
+				case errCh <- err:
+				default:
+				}
+			}
+		})
+		if err != nil {
+			return err
+		}
+		count++
+	}
+	wg.Wait()
+	x.Printf("During snapshot, wrote %d keys to disk\n", count)
+	select {
+	case err := <-errCh:
+		return err
+	default:
+		return nil
+	}
+}
+
+func rollupPostingLists(snapshotTs uint64) error {
+	var ws writeToStore
+	sl := streamLists{stream: &ws, db: pstore}
+	sl.chooseKey = func(_ []byte, _ uint64, userMeta byte) bool {
+		return userMeta&posting.BitDeltaPosting > 0
+	}
+
+	// We own the key byte slice provided.
+	sl.itemToKv = func(key []byte, itr *badger.Iterator) (*intern.KV, error) {
+		l, err := posting.ReadPostingList(key, itr)
+		if err != nil {
+			return nil, err
+		}
+		pl, err := l.Rollup()
+		if err != nil {
+			return nil, err
+		}
+		// TODO: We can update the in-memory version of posting list here.
+		data, meta := posting.MarshalPostingList(pl)
+		kv := &intern.KV{
+			Key:      key,
+			Val:      data,
+			Version:  pl.Commit,
+			UserMeta: []byte{meta},
+		}
+		return kv, nil
+	}
+
+	return sl.orchestrate(context.Background(), "", snapshotTs)
+}
diff --git a/worker/stream_lists.go b/worker/stream_lists.go
index a9f9c755..52fe12c3 100644
--- a/worker/stream_lists.go
+++ b/worker/stream_lists.go
@@ -27,7 +27,7 @@ type streamLists struct {
 	stream    kvStream
 	predicate string
 	db        *badger.ManagedDB
-	chooseKey func(key []byte, version uint64) bool
+	chooseKey func(key []byte, version uint64, userMeta byte) bool
 	itemToKv  func(key []byte, itr *badger.Iterator) (*intern.KV, error)
 }
 
@@ -148,7 +148,7 @@ func (sl *streamLists) produceKVs(ctx context.Context, ts uint64,
 				break
 			}
 			// Check if we should pick this key.
-			if sl.chooseKey != nil && !sl.chooseKey(item.Key(), item.Version()) {
+			if sl.chooseKey != nil && !sl.chooseKey(item.Key(), item.Version(), item.UserMeta()) {
 				continue
 			}
 
-- 
GitLab