Skip to content
Snippets Groups Projects
Unverified Commit 0a184ee4 authored by Manish R Jain's avatar Manish R Jain
Browse files

Bugfix: RAFT Snapshot causing leadership failures.

- Fix the bug due to which a restart from valid snapshot would cause node to not participate in election, hence making Dgraph immutable. Started keeping track of conf state, which is now passed when creating a new snapshot.
- Find the variables in node struct, which could be changed after initialization, or are initialized later. Put all accessors to those under mutex locks.
parent 98674879
No related branches found
No related tags found
No related merge requests found
......@@ -90,10 +90,10 @@ func AssignUidsOverNetwork(ctx context.Context, umap map[string]uint64) error {
var err error
lid, _ := groups().Leader(gid)
n := groups().Node(gid)
if n != nil && lid == 0 {
if n != nil {
// This is useful for testing, when the membership information doesn't have chance
// to propagate.
lid = n.raft.Status().Lead
lid = n.Raft().Status().Lead
}
if n != nil && n.id == lid {
......@@ -104,6 +104,7 @@ func AssignUidsOverNetwork(ctx context.Context, umap map[string]uint64) error {
}
} else {
x.Trace(ctx, "Not leader of group: %d. Sending to: %d", gid, lid)
_, addr := groups().Leader(gid)
p := pools().get(addr)
conn, err := p.Get()
......
......@@ -19,6 +19,8 @@ import (
"github.com/dgraph-io/dgraph/x"
)
// peerPool stores the peers per node and the addresses corresponding to them.
// We then use pool() to get an active connection to those addresses.
type peerPool struct {
sync.RWMutex
peers map[uint64]string
......@@ -69,7 +71,13 @@ type sendmsg struct {
}
type node struct {
canCampaign bool
x.SafeMutex
// SafeMutex is for fields which can be changed after init.
_confState *raftpb.ConfState
_raft raft.Node
// Fields which are never changed after init.
cfg *raft.Config
commitCh chan raftpb.Entry
ctx context.Context
......@@ -79,16 +87,47 @@ type node struct {
messages chan sendmsg
peers peerPool
props proposals
raft raft.Node
raftContext *task.RaftContext
store *raft.MemoryStorage
wal *raftwal.Wal
canCampaign bool
// applied is used to keep track of the applied RAFT proposals.
// The stages are proposed -> committed (accepted by cluster) ->
// applied (to PL) -> synced (to RocksDB).
applied x.WaterMark
}
// SetRaft would set the provided raft.Node to this node.
// It would check fail if the node is already set.
func (n *node) SetRaft(r raft.Node) {
n.Lock()
defer n.Unlock()
x.AssertTrue(n._raft == nil)
n._raft = r
}
// Raft would return back the raft.Node stored in the node.
func (n *node) Raft() raft.Node {
n.RLock()
defer n.RUnlock()
return n._raft
}
// SetConfState would store the latest ConfState generated by ApplyConfChange.
func (n *node) SetConfState(cs *raftpb.ConfState) {
n.Lock()
defer n.Unlock()
n._confState = cs
}
// ConfState would return the latest ConfState stored in node.
func (n *node) ConfState() *raftpb.ConfState {
n.RLock()
defer n.RUnlock()
return n._confState
}
func newNode(gid uint32, id uint64, myAddr string) *node {
fmt.Printf("NEW NODE GID, ID: [%v, %v]\n", gid, id)
......@@ -125,7 +164,7 @@ func newNode(gid uint32, id uint64, myAddr string) *node {
raftContext: rc,
messages: make(chan sendmsg, 1000),
}
n.applied = x.WaterMark{Name: "Committed Mark"}
n.applied = x.WaterMark{Name: fmt.Sprintf("Committed Mark: Group %d", n.gid)}
n.applied.Init()
return n
......@@ -156,7 +195,7 @@ func (n *node) AddToCluster(ctx context.Context, pid uint64) error {
}
rcBytes, err := rc.Marshal()
x.Check(err)
return n.raft.ProposeConfChange(ctx, raftpb.ConfChange{
return n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{
ID: pid,
Type: raftpb.ConfChangeAddNode,
NodeID: pid,
......@@ -192,7 +231,7 @@ var slicePool = sync.Pool{
}
func (n *node) ProposeAndWait(ctx context.Context, proposal *task.Proposal) error {
if n.raft == nil {
if n.Raft() == nil {
return x.Errorf("RAFT isn't initialized yet")
}
......@@ -214,7 +253,7 @@ func (n *node) ProposeAndWait(ctx context.Context, proposal *task.Proposal) erro
che := make(chan error, 1)
n.props.Store(proposal.Id, che)
if err = n.raft.Propose(ctx, proposalData); err != nil {
if err = n.Raft().Propose(ctx, proposalData); err != nil {
return x.Wrapf(err, "While proposing")
}
......@@ -245,7 +284,7 @@ func (n *node) send(m raftpb.Message) {
case n.messages <- sendmsg{to: m.To, data: data}:
// pass
default:
log.Fatalf("Unable to push messages to channel in send")
x.Fatalf("Unable to push messages to channel in send")
}
}
......@@ -378,7 +417,8 @@ func (n *node) processCommitCh() {
n.Connect(rc.Id, rc.Addr)
}
n.raft.ApplyConfChange(cc)
cs := n.Raft().ApplyConfChange(cc)
n.SetConfState(cs)
n.applied.Ch <- x.Mark{Index: e.Index, Done: true}
} else {
......@@ -432,19 +472,20 @@ func (n *node) Run() {
firstRun := true
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
rcBytes, err := n.raftContext.Marshal()
x.Check(err)
for {
select {
case <-ticker.C:
n.raft.Tick()
n.Raft().Tick()
case rd := <-n.raft.Ready():
case rd := <-n.Raft().Ready():
x.Check(n.wal.StoreSnapshot(n.gid, rd.Snapshot))
x.Check(n.wal.Store(n.gid, rd.HardState, rd.Entries))
n.saveToStorage(rd.Snapshot, rd.HardState, rd.Entries)
rcBytes, err := n.raftContext.Marshal()
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
x.Check(err)
......@@ -479,9 +520,9 @@ func (n *node) Run() {
n.applied.Ch <- status
}
n.raft.Advance()
n.Raft().Advance()
if firstRun && n.canCampaign {
go n.raft.Campaign(n.ctx)
go n.Raft().Campaign(n.ctx)
firstRun = false
}
......@@ -496,7 +537,7 @@ func (n *node) Stop() {
}
func (n *node) Step(ctx context.Context, msg raftpb.Message) error {
return n.raft.Step(ctx, msg)
return n.Raft().Step(ctx, msg)
}
func (n *node) snapshotPeriodically() {
......@@ -537,7 +578,8 @@ func (n *node) snapshotPeriodically() {
rc, err := n.raftContext.Marshal()
x.Check(err)
s, err := n.store.CreateSnapshot(le, nil, rc)
s, err := n.store.CreateSnapshot(le, n.ConfState(), rc)
x.Checkf(err, "While creating snapshot")
x.Checkf(n.store.Compact(le), "While compacting snapshot")
x.Check(n.wal.StoreSnapshot(n.gid, s))
......@@ -625,16 +667,16 @@ func (n *node) InitAndStartNode(wal *raftwal.Wal) {
if restart {
fmt.Printf("RESTARTING\n")
n.raft = raft.RestartNode(n.cfg)
n.SetRaft(raft.RestartNode(n.cfg))
} else {
if groups().HasPeer(n.gid) {
n.joinPeers()
n.raft = raft.StartNode(n.cfg, nil)
n.SetRaft(raft.StartNode(n.cfg, nil))
} else {
peers := []raft.Peer{{ID: n.id}}
n.raft = raft.StartNode(n.cfg, peers)
n.SetRaft(raft.StartNode(n.cfg, peers))
// Trigger election, so this node can become the leader of this single-node cluster.
n.canCampaign = true
}
......@@ -648,16 +690,18 @@ func (n *node) InitAndStartNode(wal *raftwal.Wal) {
}
func (n *node) AmLeader() bool {
if n == nil || n.raft == nil {
if n.Raft() == nil {
return false
}
return n.raft.Status().Lead == n.raft.Status().ID
r := n.Raft()
return r.Status().Lead == r.Status().ID
}
func (w *grpcWorker) applyMessage(ctx context.Context, msg raftpb.Message) error {
var rc task.RaftContext
x.Check(rc.Unmarshal(msg.Context))
node := groups().Node(rc.Group)
// TODO: Handle the case where node isn't present for this group.
node.Connect(msg.From, rc.Addr)
c := make(chan error, 1)
......
......@@ -181,6 +181,7 @@ func (g *groupi) HasPeer(group uint32) bool {
func (g *groupi) Leader(group uint32) (uint64, string) {
g.RLock()
defer g.RUnlock()
all := g.all[group]
if all == nil {
return 0, ""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment