Skip to content
Snippets Groups Projects
Unverified Commit b1b69b32 authored by Pawan Rawal's avatar Pawan Rawal
Browse files

Dont crash if initial snapshot retrieval fails

parent 2fade242
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ package worker
import (
"encoding/binary"
"fmt"
"math/rand"
"sync"
"time"
......@@ -413,10 +414,10 @@ func (n *node) applyAllMarks(ctx context.Context) {
n.Applied.WaitForMark(ctx, lastIndex)
}
func (n *node) retrieveSnapshot() {
func (n *node) retrieveSnapshot() error {
pool := groups().Leader(groups().groupId())
if pool == nil {
x.Fatalf("Unable to reach leader in group %d", n.gid)
return fmt.Errorf("Unable to reach leader in group %d", n.gid)
}
// Wait for watermarks to sync since populateShard writes directly to db, otherwise
......@@ -431,12 +432,15 @@ func (n *node) retrieveSnapshot() {
if _, err := n.populateShard(pstore, pool); err != nil {
// TODO: We definitely don't want to just fall flat on our face if we can't
// retrieve a simple snapshot.
x.Fatalf("Cannot retrieve snapshot from peer, error: %v\n", err)
return fmt.Errorf("Cannot retrieve snapshot from peer, error: %v\n", err)
}
// Populate shard stores the streamed data directly into db, so we need to refresh
// schema for current group id
x.Checkf(schema.LoadFromDb(), "Error while initilizating schema")
if err := schema.LoadFromDb(); err != nil {
return fmt.Errorf("Error while initilizating schema: %+v\n", err)
}
groups().triggerMembershipSync()
return nil
}
func (n *node) Run() {
......@@ -490,7 +494,7 @@ func (n *node) Run() {
x.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id)
// It's ok to block tick while retrieving snapshot, since it's a follower
n.retrieveSnapshot()
x.Checkf(n.retrieveSnapshot(), "While retrieving snapshot")
x.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid)
} else {
x.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id)
......@@ -669,7 +673,14 @@ func (n *node) InitAndStartNode(wal *raftwal.Wal) {
if _, hasPeer := groups().MyPeer(); hasPeer {
// Get snapshot before joining peers as it can take time to retrieve it and we dont
// want the quorum to be inactive when it happens.
n.retrieveSnapshot()
err := n.retrieveSnapshot()
count := 1
for err != nil && count <= 5 {
x.Printf("While retrieving snapshot, err: %v", err)
err = n.retrieveSnapshot()
count++
time.Sleep(time.Second)
}
n.joinPeers()
n.SetRaft(raft.StartNode(n.Cfg, nil))
} else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment