Skip to content
Snippets Groups Projects
Unverified Commit d6a776a2 authored by Janardhan Reddy's avatar Janardhan Reddy
Browse files

evict pl's from memory while populating shard

parent 40eb5c16
No related branches found
No related tags found
No related merge requests found
...@@ -453,6 +453,18 @@ func CommitLists(numRoutines int) { ...@@ -453,6 +453,18 @@ func CommitLists(numRoutines int) {
wg.Wait() wg.Wait()
} }
// EvictAll removes all pl's stored in memory
func EvictAll(numRoutines int) {
stopTheWorld.Lock()
defer stopTheWorld.Unlock()
CommitLists(numRoutines)
lhmap.EachWithDelete(func(k uint64, l *List) {
l.SetForDeletion()
l.decr()
})
}
// The following logic is used to batch up all the writes to RocksDB. // The following logic is used to batch up all the writes to RocksDB.
type syncEntry struct { type syncEntry struct {
key []byte key []byte
......
...@@ -516,6 +516,14 @@ func (n *node) retrieveSnapshot(rc task.RaftContext) { ...@@ -516,6 +516,14 @@ func (n *node) retrieveSnapshot(rc task.RaftContext) {
x.AssertTruef(pool != nil, "Pool shouldn't be nil for address: %v for id: %v", addr, rc.Id) x.AssertTruef(pool != nil, "Pool shouldn't be nil for address: %v for id: %v", addr, rc.Id)
x.AssertTrue(rc.Group == n.gid) x.AssertTrue(rc.Group == n.gid)
// Wait for watermarks to sync since populateShard writes directly to db, otherwise
// the values might get overwritten
// Safe to keep this line
n.syncAllMarks(n.ctx)
// Need to clear pl's stored in memory for the case when retrieving snapshot with
// index greater than this node's last index
// Should invalidate/remove pl's to this group only ideally
posting.EvictAll(10)
x.Check2(populateShard(n.ctx, pool, n.gid)) x.Check2(populateShard(n.ctx, pool, n.gid))
x.Checkf(schema.LoadFromDb(), "Error while initilizating schema") x.Checkf(schema.LoadFromDb(), "Error while initilizating schema")
} }
...@@ -668,8 +676,10 @@ func (n *node) joinPeers() { ...@@ -668,8 +676,10 @@ func (n *node) joinPeers() {
x.AssertTruef(pool != nil, "Unable to get pool for addr: %q for peer: %d", paddr, pid) x.AssertTruef(pool != nil, "Unable to get pool for addr: %q for peer: %d", paddr, pid)
// Bring the instance up to speed first. // Bring the instance up to speed first.
_, err := populateShard(n.ctx, pool, n.gid) // Raft would decide whether snapshot needs to fetched or not
x.Checkf(err, "Error while populating shard") // so populateShard is not needed
// _, err := populateShard(n.ctx, pool, n.gid)
// x.Checkf(err, "Error while populating shard")
conn, err := pool.Get() conn, err := pool.Get()
x.Check(err) x.Check(err)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment