Skip to content
Snippets Groups Projects
Unverified Commit 0c0e0424 authored by Manish R Jain's avatar Manish R Jain
Browse files

RAFT Snapshotting

- Use synced watermarks for snapshotting.
- Ensure that snapshots work when node is restarted.
- And also when the leader passes a snapshot to a follower. This would happen if the follower needs to be brought up to the leader's state.
- Fix a bug with UID assignment.
parent b08e0e53
No related branches found
No related tags found
No related merge requests found
......@@ -167,7 +167,7 @@ func convertToEdges(ctx context.Context, nquads []rdf.NQuad) (mutationResult, er
if len(newUids) > 0 {
if err := worker.AssignUidsOverNetwork(ctx, newUids); err != nil {
x.TraceError(ctx, x.Wrapf(err, "Error while GetOrAssignUidsOverNetwork"))
x.TraceError(ctx, x.Wrapf(err, "Error while AssignUidsOverNetwork for newUids: %v", newUids))
return mr, err
}
}
......@@ -704,7 +704,9 @@ func main() {
x.Init()
checkFlagsAndInitDirs()
ps, err := store.NewStore(*postingDir)
// All the writes to posting store should be synchronous. We use batched writers
// for posting lists, so the cost of sync writes is amortized.
ps, err := store.NewSyncStore(*postingDir)
x.Checkf(err, "Error initializing postings store")
defer ps.Close()
......
......@@ -70,6 +70,9 @@ type syncMarks struct {
func (g *syncMarks) create(group uint32) *x.WaterMark {
g.Lock()
defer g.Unlock()
if g.m == nil {
g.m = make(map[uint32]*x.WaterMark)
}
if prev, present := g.m[group]; present {
return prev
......@@ -93,6 +96,12 @@ func (g *syncMarks) Get(group uint32) *x.WaterMark {
return g.create(group)
}
// WaterMarkFor returns the synced watermark for the given RAFT group.
// We use this to determine the index to use when creating a new snapshot.
func WaterMarkFor(group uint32) *x.WaterMark {
return marks.Get(group)
}
type counters struct {
ticker *time.Ticker
done uint64
......@@ -460,9 +469,7 @@ func batchCommit() {
}
// Add a sleep clause to avoid a busy wait loop if there's no input to commitCh.
sleepFor := 10*time.Millisecond - time.Since(start)
if sleepFor > time.Millisecond {
time.Sleep(sleepFor)
}
time.Sleep(sleepFor)
}
}
}
package raftwal
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
......@@ -50,18 +52,40 @@ func (w *Wal) prefix(gid uint32) []byte {
return b
}
// Store stores the snapshot, hardstate and entries for a given RAFT group.
func (w *Wal) Store(gid uint32, s raftpb.Snapshot, h raftpb.HardState, es []raftpb.Entry) error {
func (w *Wal) StoreSnapshot(gid uint32, s raftpb.Snapshot) error {
b := w.wals.NewWriteBatch()
defer b.Destroy()
if !raft.IsEmptySnap(s) {
data, err := s.Marshal()
if err != nil {
return x.Wrapf(err, "wal.Store: While marshal snapshot")
if raft.IsEmptySnap(s) {
return nil
}
data, err := s.Marshal()
if err != nil {
return x.Wrapf(err, "wal.Store: While marshal snapshot")
}
// Delete all entries before this snapshot to save disk space.
start := w.entryKey(gid, 0, 0)
last := w.entryKey(gid, s.Metadata.Term, s.Metadata.Index)
itr := w.wals.NewIterator()
defer itr.Close()
for itr.Seek(start); itr.Valid(); itr.Next() {
key := itr.Key().Data()
if bytes.Compare(key, last) > 0 {
break
}
b.Put(w.snapshotKey(gid), data)
b.Delete(key)
}
b.Put(w.snapshotKey(gid), data)
fmt.Printf("Writing snapshot to WAL: %+v\n", s)
return x.Wrapf(w.wals.WriteBatch(b), "wal.Store: While Store Snapshot")
}
// Store stores the snapshot, hardstate and entries for a given RAFT group.
func (w *Wal) Store(gid uint32, h raftpb.HardState, es []raftpb.Entry) error {
b := w.wals.NewWriteBatch()
defer b.Destroy()
if !raft.IsEmptyHardState(h) {
data, err := h.Marshal()
......@@ -85,7 +109,8 @@ func (w *Wal) Store(gid uint32, s raftpb.Snapshot, h raftpb.HardState, es []raft
// If we get no entries, then the default value of t and i would be zero. That would
// end up deleting all the previous valid raft entry logs. This check avoids that.
if t > 0 || i > 0 {
// Delete all keys above this index.
// When writing an Entry with Index i, any previously-persisted entries
// with Index >= i must be discarded.
start := w.entryKey(gid, t, i+1)
prefix := w.prefix(gid)
itr := w.wals.NewIterator()
......
......@@ -126,10 +126,7 @@ func (s *Store) NewWriteBatch() *rdb.WriteBatch { return rdb.NewWriteBatch() }
// WriteBatch does a batch write to RocksDB from the data in WriteBatch object.
func (s *Store) WriteBatch(wb *rdb.WriteBatch) error {
if err := s.db.Write(s.wopt, wb); err != nil {
return x.Wrap(err)
}
return nil
return x.Wrap(s.db.Write(s.wopt, wb))
}
// NewCheckpoint creates new checkpoint from current store.
......
......@@ -88,10 +88,19 @@ func AssignUidsOverNetwork(ctx context.Context, umap map[string]uint64) error {
var ul *task.List
var err error
if groups().ServesGroup(gid) {
lid, _ := groups().Leader(gid)
n := groups().Node(gid)
if n != nil && lid == 0 {
// This is useful for testing, when the membership information doesn't have chance
// to propagate.
lid = n.raft.Status().Lead
}
if n != nil && n.id == lid {
x.Trace(ctx, "Calling assignUids as I'm leader of group: %d", gid)
ul, err = assignUids(ctx, num)
if err != nil {
return err
return x.Wrap(err)
}
} else {
......@@ -103,6 +112,7 @@ func AssignUidsOverNetwork(ctx context.Context, umap map[string]uint64) error {
return err
}
defer p.Put(conn)
x.Trace(ctx, "Calling AssignUids for group: %d, addr: %s", gid, addr)
c := NewWorkerClient(conn)
ul, err = c.AssignUids(ctx, num)
......
......@@ -15,6 +15,7 @@ import (
"github.com/coreos/etcd/raft/raftpb"
"golang.org/x/net/context"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
......@@ -133,6 +134,10 @@ func newNode(gid uint32, id uint64, myAddr string) *node {
}
func (n *node) Connect(pid uint64, addr string) {
for n == nil {
// Sometimes this function causes a panic. My guess is that n is sometimes still uninitialized.
time.Sleep(time.Second)
}
if pid == n.id {
return
}
......@@ -205,7 +210,8 @@ func (n *node) ProposeAndWait(ctx context.Context, proposal *task.Proposal) erro
if err != nil {
return err
}
proposalData := slice[:upto]
proposalData := make([]byte, upto)
copy(proposalData, slice[:upto])
che := make(chan error, 1)
n.props.Store(proposal.Id, che)
......@@ -245,6 +251,39 @@ func (n *node) send(m raftpb.Message) {
}
}
func (n *node) batchAndSendMessages() {
batches := make(map[uint64]*bytes.Buffer)
for {
select {
case sm := <-n.messages:
var buf *bytes.Buffer
if b, ok := batches[sm.to]; !ok {
buf = new(bytes.Buffer)
batches[sm.to] = buf
} else {
buf = b
}
x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
x.Check2(buf.Write(sm.data))
default:
start := time.Now()
for to, buf := range batches {
if buf.Len() == 0 {
continue
}
data := make([]byte, buf.Len())
copy(data, buf.Bytes())
go n.doSendMessage(to, data)
buf.Reset()
}
// Add a sleep clause to avoid a busy wait loop if there's no input to commitCh.
sleepFor := 10*time.Millisecond - time.Since(start)
time.Sleep(sleepFor)
}
}
}
func (n *node) doSendMessage(to uint64, data []byte) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
......@@ -277,31 +316,6 @@ func (n *node) doSendMessage(to uint64, data []byte) {
}
}
func (n *node) batchAndSendMessages() {
batches := make(map[uint64]*bytes.Buffer)
ticker := time.NewTicker(10 * time.Millisecond)
for {
select {
case sm := <-n.messages:
if _, ok := batches[sm.to]; !ok {
batches[sm.to] = new(bytes.Buffer)
}
buf := batches[sm.to]
binary.Write(buf, binary.LittleEndian, uint32(len(sm.data)))
buf.Write(sm.data)
case <-ticker.C:
for to, buf := range batches {
if buf.Len() == 0 {
continue
}
go n.doSendMessage(to, buf.Bytes())
buf.Reset()
}
}
}
}
func (n *node) processMutation(e raftpb.Entry, m *task.Mutations) error {
// TODO: Need to pass node and entry index.
rv := x.RaftValue{Group: n.gid, Index: e.Index}
......@@ -333,7 +347,7 @@ func (n *node) process(e raftpb.Entry, pending chan struct{}) {
pending <- struct{}{} // This will block until we can write to it.
var proposal task.Proposal
x.Check(proposal.Unmarshal(e.Data))
x.Checkf(proposal.Unmarshal(e.Data), "Unable to parse entry: %+v", e)
var err error
if proposal.Mutations != nil {
......@@ -351,7 +365,7 @@ func (n *node) processCommitCh() {
pending := make(chan struct{}, numPendingMutations)
for e := range n.commitCh {
if e.Data == nil {
if len(e.Data) == 0 {
n.applied.Ch <- x.Mark{Index: e.Index, Done: true}
continue
}
......@@ -397,31 +411,32 @@ func (n *node) saveToStorage(s raftpb.Snapshot, h raftpb.HardState,
n.store.Append(es)
}
func (n *node) processSnapshot(s raftpb.Snapshot) {
lead := n.raft.Status().Lead
if lead == 0 {
return
}
addr := n.peers.Get(lead)
x.AssertTruef(addr != "", "Should have the leader address: %v", lead)
func (n *node) retrieveSnapshot(rc task.RaftContext) {
addr := n.peers.Get(rc.Id)
x.AssertTruef(addr != "", "Should have the address for %d", rc.Id)
pool := pools().get(addr)
x.AssertTruef(pool != nil, "Leader: %d pool should not be nil", lead)
x.AssertTruef(pool != nil, "Pool shouldn't be nil for address: %v for id: %v", addr, rc.Id)
_, err := populateShard(context.TODO(), pool, 0)
x.Checkf(err, "processSnapshot")
x.AssertTrue(rc.Group == n.gid)
x.Check2(populateShard(n.ctx, pool, n.gid))
}
func (n *node) Run() {
firstRun := true
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
n.raft.Tick()
case rd := <-n.raft.Ready():
x.Check(n.wal.Store(n.gid, rd.Snapshot, rd.HardState, rd.Entries))
x.Check(n.wal.StoreSnapshot(n.gid, rd.Snapshot))
x.Check(n.wal.Store(n.gid, rd.HardState, rd.Entries))
n.saveToStorage(rd.Snapshot, rd.HardState, rd.Entries)
rcBytes, err := n.raftContext.Marshal()
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
......@@ -431,7 +446,18 @@ func (n *node) Run() {
}
if !raft.IsEmptySnap(rd.Snapshot) {
n.processSnapshot(rd.Snapshot)
// We don't send snapshots to other nodes. But, if we get one, that means
// either the leader is trying to bring us up to state; or this is the
// snapshot that I created. Only the former case should be handled.
var rc task.RaftContext
x.Check(rc.Unmarshal(rd.Snapshot.Data))
if rc.Id != n.id {
fmt.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id)
n.retrieveSnapshot(rc)
fmt.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid)
} else {
fmt.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id)
}
}
if len(rd.CommittedEntries) > 0 {
x.Trace(n.ctx, "Found %d committed entries", len(rd.CommittedEntries))
......@@ -467,25 +493,47 @@ func (n *node) Step(ctx context.Context, msg raftpb.Message) error {
}
func (n *node) snapshotPeriodically() {
ticker := time.NewTicker(10 * time.Minute)
if n.gid == 0 {
// Group zero is dedicated for membership information, whose state we don't persist.
// So, taking snapshots would end up deleting the RAFT entries that we need to
// regenerate the state on a crash. Therefore, don't take snapshots.
return
}
var prev string
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
le, err := n.store.LastIndex()
x.Checkf(err, "Unable to retrieve last index")
water := posting.WaterMarkFor(n.gid)
le := water.DoneUntil()
existing, err := n.store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
si := existing.Metadata.Index
if le <= si {
msg := fmt.Sprintf("Current watermark %d <= previous snapshot %d. Skipping.", le, si)
if msg != prev {
prev = msg
fmt.Println(msg)
}
continue
}
msg := fmt.Sprintf("Taking snapshot for group: %d at watermark: %d\n", n.gid, le)
if msg != prev {
prev = msg
fmt.Println(msg)
}
msg := fmt.Sprintf("Snapshot from %v", strconv.FormatUint(n.id, 10))
_, err = n.store.CreateSnapshot(le, nil, []byte(msg))
rc, err := n.raftContext.Marshal()
x.Check(err)
s, err := n.store.CreateSnapshot(le, nil, rc)
x.Checkf(err, "While creating snapshot")
x.Checkf(n.store.Compact(le), "While compacting snapshot")
x.Check(n.wal.StoreSnapshot(n.gid, s))
case <-n.done:
return
......@@ -515,7 +563,7 @@ func (n *node) joinPeers() {
x.AssertTruef(pool != nil, "Unable to find addr for peer: %d", pid)
// Bring the instance up to speed first.
_, err := populateShard(n.ctx, pool, 0)
_, err := populateShard(n.ctx, pool, n.gid)
x.Checkf(err, "Error while populating shard")
conn, err := pool.Get()
......@@ -599,7 +647,7 @@ func (n *node) InitAndStartNode(wal *raftwal.Wal) {
go n.Run()
// TODO: Find a better way to snapshot, so we don't lose the membership
// state information, which isn't persisted.
// go n.snapshotPeriodically()
go n.snapshotPeriodically()
go n.batchAndSendMessages()
}
......@@ -633,6 +681,9 @@ func (w *grpcWorker) RaftMessage(ctx context.Context, query *Payload) (*Payload,
}
for idx := 0; idx < len(query.Data); {
x.AssertTruef(len(query.Data[idx:]) >= 4,
"Slice left of size: %v. Expected at least 4.", len(query.Data[idx:]))
sz := int(binary.LittleEndian.Uint32(query.Data[idx : idx+4]))
idx += 4
msg := raftpb.Message{}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment