Skip to content
Snippets Groups Projects
Unverified Commit ef3309a3 authored by Manish R Jain's avatar Manish R Jain
Browse files

Instead of discarding all versions anytime, we now only discard versions which...

Instead of discarding all versions anytime, we now only discard versions which are below the timestamp used by snapshot. Thus, we keep all versions of keys after the last snapshot.
parent f048c675
No related branches found
No related tags found
No related merge requests found
......@@ -308,10 +308,10 @@ func (s *levelsController) compactBuildTables(
it.Rewind()
// Pick up the currently pending transactions' min readTs, so we can discard versions below this
// readTs. We should never discard any versions starting from above this timestamp, because that
// would affect the snapshot view guarantee provided by transactions.
minReadTs := s.kv.orc.readMark.MinReadTs()
// Pick a discard ts, so we can discard versions below this ts. We should
// never discard any versions starting from above this timestamp, because
// that would affect the snapshot view guarantee provided by transactions.
discardTs := s.kv.orc.discardAtOrBelow()
// Start generating new tables.
type newTableResult struct {
......@@ -350,7 +350,7 @@ func (s *levelsController) compactBuildTables(
vs := it.Value()
version := y.ParseTs(it.Key())
if version <= minReadTs {
if version <= discardTs {
// Keep track of the number of versions encountered for this key. Only consider the
// versions which are below the minReadTs, otherwise, we might end up discarding the
// only valid version for a running transaction.
......
......@@ -77,3 +77,10 @@ func (txn *Txn) CommitAt(commitTs uint64, callback func(error)) error {
func (db *ManagedDB) GetSequence(_ []byte, _ uint64) (*Sequence, error) {
panic("Cannot use GetSequence for ManagedDB.")
}
// SetDiscardTs sets a timestamp at or below which, any invalid or deleted
// versions can be discarded from the LSM tree, and thence from the value log to
// reclaim disk space.
func (db *ManagedDB) SetDiscardTs(ts uint64) {
db.orc.setDiscardTs(ts)
}
......@@ -40,7 +40,10 @@ type oracle struct {
writeLock sync.Mutex
nextCommit uint64
readMark y.WaterMark
// Either of these is used to determine which versions can be permanently
// discarded during compaction.
discardTs uint64 // Used by ManagedDB.
readMark y.WaterMark // Used by DB.
// commits stores a key fingerprint and latest commit counter for it.
// refCount is used to clear out commits map to avoid a memory blowup.
......@@ -81,6 +84,23 @@ func (o *oracle) commitTs() uint64 {
return o.nextCommit
}
// Any deleted or invalid versions at or below ts would be discarded during
// compaction to reclaim disk space in LSM tree and thence value log.
func (o *oracle) setDiscardTs(ts uint64) {
o.Lock()
defer o.Unlock()
o.discardTs = ts
}
func (o *oracle) discardAtOrBelow() uint64 {
if o.isManaged {
o.Lock()
defer o.Unlock()
return o.discardTs
}
return o.readMark.MinReadTs()
}
// hasConflict must be called while having a lock.
func (o *oracle) hasConflict(txn *Txn) bool {
if len(txn.reads) == 0 {
......
......@@ -163,52 +163,52 @@
"revisionTime": "2016-09-07T16:21:46Z"
},
{
"checksumSHA1": "2H+tsKY88Anb6Ys9Ls8X9NVmSEc=",
"checksumSHA1": "1qvmTlilsJCsxarfPoawlGF7jQw=",
"path": "github.com/dgraph-io/badger",
"revision": "391b6d3b93e6014fe8c2971fcc0c1266e47dbbd9",
"revisionTime": "2018-07-11T21:59:47Z",
"version": "v1.5.3",
"versionExact": "v1.5.3"
"revision": "937af5f844102c2849b070cdb0ec33d82aa97cc3",
"revisionTime": "2018-07-11T22:35:56Z",
"version": "HEAD",
"versionExact": "HEAD"
},
{
"checksumSHA1": "oOuT7ebEiZ1ViHLKdFxKFOvobAQ=",
"path": "github.com/dgraph-io/badger/options",
"revision": "391b6d3b93e6014fe8c2971fcc0c1266e47dbbd9",
"revisionTime": "2018-07-11T21:59:47Z",
"version": "v1.5.3",
"versionExact": "v1.5.3"
"revision": "937af5f844102c2849b070cdb0ec33d82aa97cc3",
"revisionTime": "2018-07-11T22:35:56Z",
"version": "HEAD",
"versionExact": "HEAD"
},
{
"checksumSHA1": "gGTDnTVVw5kcT2P5NXZV1YSckOU=",
"path": "github.com/dgraph-io/badger/protos",
"revision": "391b6d3b93e6014fe8c2971fcc0c1266e47dbbd9",
"revisionTime": "2018-07-11T21:59:47Z",
"version": "v1.5.3",
"versionExact": "v1.5.3"
"revision": "937af5f844102c2849b070cdb0ec33d82aa97cc3",
"revisionTime": "2018-07-11T22:35:56Z",
"version": "HEAD",
"versionExact": "HEAD"
},
{
"checksumSHA1": "00T6XbLV4d95J7hm6kTXDReaQHM=",
"path": "github.com/dgraph-io/badger/skl",
"revision": "391b6d3b93e6014fe8c2971fcc0c1266e47dbbd9",
"revisionTime": "2018-07-11T21:59:47Z",
"version": "v1.5.3",
"versionExact": "v1.5.3"
"revision": "937af5f844102c2849b070cdb0ec33d82aa97cc3",
"revisionTime": "2018-07-11T22:35:56Z",
"version": "HEAD",
"versionExact": "HEAD"
},
{
"checksumSHA1": "I33KkP2lnYqJDasvvsAlebzkeko=",
"path": "github.com/dgraph-io/badger/table",
"revision": "391b6d3b93e6014fe8c2971fcc0c1266e47dbbd9",
"revisionTime": "2018-07-11T21:59:47Z",
"version": "v1.5.3",
"versionExact": "v1.5.3"
"revision": "937af5f844102c2849b070cdb0ec33d82aa97cc3",
"revisionTime": "2018-07-11T22:35:56Z",
"version": "HEAD",
"versionExact": "HEAD"
},
{
"checksumSHA1": "v2pJQ5NbS034cLP+GM1WLlGnByY=",
"path": "github.com/dgraph-io/badger/y",
"revision": "391b6d3b93e6014fe8c2971fcc0c1266e47dbbd9",
"revisionTime": "2018-07-11T21:59:47Z",
"version": "v1.5.3",
"versionExact": "v1.5.3"
"revision": "937af5f844102c2849b070cdb0ec33d82aa97cc3",
"revisionTime": "2018-07-11T22:35:56Z",
"version": "HEAD",
"versionExact": "HEAD"
},
{
"checksumSHA1": "a29TtOU87eZA0S6wL+rAkpqUEzc=",
......
......@@ -389,6 +389,8 @@ func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error {
snap.Index, snap.MinPendingStartTs)
data, err := snap.Marshal()
x.Check(err)
// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(snap.MinPendingStartTs - 1)
return n.Store.CreateSnapshot(snap.Index, n.ConfState(), data)
} else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment