Skip to content
Snippets Groups Projects
Unverified Commit f71dfb54 authored by Manish R Jain's avatar Manish R Jain
Browse files

Improve Raft loop based on Etcd's usage and recommendations.

parent f47059df
No related branches found
No related tags found
No related merge requests found
......@@ -573,6 +573,22 @@ func (n *node) process(index uint64, proposal *protos.Proposal, pending chan str
const numPendingMutations = 10000
func (n *node) applyConfChange(e raftpb.Entry) {
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
if len(cc.Context) > 0 {
var rc protos.RaftContext
x.Check(rc.Unmarshal(cc.Context))
n.Connect(rc.Id, rc.Addr)
}
cs := n.Raft().ApplyConfChange(cc)
n.SetConfState(cs)
n.applied.Done(e.Index)
posting.SyncMarkFor(n.gid).Done(e.Index)
}
func (n *node) processApplyCh() {
pending := make(chan struct{}, numPendingMutations)
......@@ -584,19 +600,7 @@ func (n *node) processApplyCh() {
}
if e.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
if len(cc.Context) > 0 {
var rc protos.RaftContext
x.Check(rc.Unmarshal(cc.Context))
n.Connect(rc.Id, rc.Addr)
}
cs := n.Raft().ApplyConfChange(cc)
n.SetConfState(cs)
n.applied.Done(e.Index)
posting.SyncMarkFor(n.gid).Done(e.Index)
n.applyConfChange(e)
continue
}
......@@ -729,17 +733,22 @@ func (n *node) Run() {
}
leader = rd.RaftState == raft.StateLeader
}
x.Check(n.wal.StoreSnapshot(n.gid, rd.Snapshot))
if leader {
// Leader can send messages in parallel with writing to disk.
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
msg.Context = rcBytes
n.send(msg)
}
}
// First store the entries, then the hardstate and snapshot.
x.Check(n.wal.Store(n.gid, rd.HardState, rd.Entries))
x.Check(n.wal.StoreSnapshot(n.gid, rd.Snapshot))
// Now store them in the in-memory store.
n.saveToStorage(rd.Snapshot, rd.HardState, rd.Entries)
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
msg.Context = rcBytes
n.send(msg)
}
if !raft.IsEmptySnap(rd.Snapshot) {
// We don't send snapshots to other nodes. But, if we get one, that means
// either the leader is trying to bring us up to state; or this is the
......@@ -748,6 +757,8 @@ func (n *node) Run() {
x.Check(rc.Unmarshal(rd.Snapshot.Data))
x.AssertTrue(rc.Group == n.gid)
if rc.Id != n.id {
// NOTE: Retrieving snapshot here is OK, after storing it above in WAL, because
// rc.Id != n.id.
x.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id)
n.retrieveSnapshot(rc.Id)
x.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid)
......@@ -761,6 +772,7 @@ func (n *node) Run() {
}
}
// Now schedule or apply committed entries.
for _, entry := range rd.CommittedEntries {
// Need applied watermarks for schema mutation also for read linearazibility
// Applied watermarks needs to be emitted as soon as possible sequentially.
......@@ -771,10 +783,23 @@ func (n *node) Run() {
n.applied.Begin(entry.Index)
posting.SyncMarkFor(n.gid).Begin(entry.Index)
// Just queue up to be processed. Don't wait on them.
n.applyCh <- entry
if !leader && entry.Type == raftpb.EntryConfChange {
// Config changes in followers must be applied straight away.
n.applyConfChange(entry)
} else {
// Just queue up to be processed. Don't wait on them.
n.applyCh <- entry
}
}
if !leader {
// Followers should send messages later.
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
msg.Context = rcBytes
n.send(msg)
}
}
n.Raft().Advance()
if firstRun && n.canCampaign {
go n.Raft().Campaign(n.ctx)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment