Skip to content
Snippets Groups Projects
Unverified Commit b2ee1f74 authored by Pawan Rawal's avatar Pawan Rawal
Browse files

Dont log.Fatal while joining peers. Retry indefinitely.

parent 2c63353d
No related branches found
No related tags found
No related merge requests found
......@@ -251,6 +251,9 @@ func Get(key []byte) (rlist *List, err error) {
// Any initialization for l must be done before PutIfMissing. Once it's added
// to the map, any other goroutine can retrieve it.
l, err := getNew(key, pstore)
if err != nil {
return nil, err
}
// We are always going to return lp to caller, whether it is l or not
lp = lcache.PutIfMissing(string(key), l)
if lp != l {
......@@ -258,7 +261,7 @@ func Get(key []byte) (rlist *List, err error) {
} else if atomic.LoadInt32(&l.onDisk) == 0 {
btree.Insert(l.key)
}
return lp, err
return lp, nil
}
// GetLru checks the lru map and returns it if it exits
......
......@@ -622,31 +622,25 @@ func (n *node) snapshot(skip uint64) {
x.Check(n.Wal.StoreSnapshot(n.gid, s))
}
func (n *node) joinPeers() {
func (n *node) joinPeers() error {
// Get leader information for MY group.
pl := groups().Leader(n.gid)
if pl == nil {
x.Fatalf("Unable to reach leader or any other server in group %d", n.gid)
return x.Errorf("Unable to reach leader or any other server in group %d", n.gid)
}
// Bring the instance up to speed first.
// Raft would decide whether snapshot needs to fetched or not
// so populateShard is not needed
// _, err := populateShard(n.ctx, pool, n.gid)
// x.Checkf(err, "Error while populating shard")
gconn := pl.Get()
c := intern.NewRaftClient(gconn)
x.Printf("Calling JoinCluster")
ctx, cancel := context.WithTimeout(n.ctx, time.Second)
defer cancel()
// JoinCluster can block indefinitely, raft ignores conf change proposal
// if it has pending configuration.
_, err := c.JoinCluster(ctx, n.RaftContext)
// TODO: This should keep on indefinitely trying to join the cluster, instead of crashing.
x.Checkf(err, "Error while joining cluster")
if _, err := c.JoinCluster(ctx, n.RaftContext); err != nil {
return x.Errorf("Error while joining cluster: %+v\n", err)
}
x.Printf("Done with JoinCluster call\n")
return nil
}
// InitAndStartNode gets called after having at least one membership sync with the cluster.
......@@ -680,7 +674,13 @@ func (n *node) InitAndStartNode(wal *raftwal.Wal) {
count++
time.Sleep(time.Second)
}
n.joinPeers()
for {
if err := n.joinPeers(); err == nil {
break
}
x.Printf("Error while joining peers: %+v. Retrying...\n", err)
time.Sleep(time.Second)
}
n.SetRaft(raft.StartNode(n.Cfg, nil))
} else {
peers := []raft.Peer{{ID: n.Id}}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment