Newer
Older
/*
* Copyright (C) 2017 Dgraph Labs, Inc. and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
Manish R Jain
committed
package worker
import (
Manish R Jain
committed
"encoding/binary"
Manish R Jain
committed
"fmt"
"log"
Manish R Jain
committed
"math/rand"
Manish R Jain
committed
"sync"
"time"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"github.com/dgraph-io/dgraph/protos"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
Manish R Jain
committed
"github.com/dgraph-io/dgraph/x"
)
errorNodeIDExists = "Error Node ID already exists in the cluster"
type peerPoolEntry struct {
// Never the empty string. Possibly a bogus address -- bad port number, the value
// of *myAddr, or some screwed up Raft config.
// An owning reference to a pool for this peer (or nil if addr is sufficiently bogus).
// peerPool stores the peers' addresses and our connections to them. It has exactly one
// entry for every peer other than ourselves. Some of these peers might be unreachable or
// have bogus (but never empty) addresses.
Manish R Jain
committed
type peerPool struct {
sync.RWMutex
peers map[uint64]peerPoolEntry
Manish R Jain
committed
}
var (
errNoPeerPoolEntry = fmt.Errorf("no peerPool entry")
errNoPeerPool = fmt.Errorf("no peerPool pool, could not connect")
)
// getPool returns the non-nil pool for a peer. This might error even if get(id)
// succeeds, if the pool is nil. This happens if the peer was configured so badly (it had
// a totally bogus addr) we can't make a pool. (A reasonable refactoring would have us
// make a pool, one that has a nil gRPC connection.)
//
// You must call pools().release on the pool.
func (p *peerPool) getPool(id uint64) (*pool, error) {
p.RLock()
defer p.RUnlock()
ent, ok := p.peers[id]
if !ok {
return nil, errNoPeerPoolEntry
}
if ent.poolOrNil == nil {
return nil, errNoPeerPool
}
return ent.poolOrNil, nil
}
func (p *peerPool) get(id uint64) (string, bool) {
Manish R Jain
committed
p.RLock()
defer p.RUnlock()
ret, ok := p.peers[id]
return ret.addr, ok
Manish R Jain
committed
}
func (p *peerPool) set(id uint64, addr string, pl *pool) {
Manish R Jain
committed
p.Lock()
defer p.Unlock()
if old, ok := p.peers[id]; ok {
if old.poolOrNil != nil {
pools().release(old.poolOrNil)
}
}
p.peers[id] = peerPoolEntry{addr, pl}
Manish R Jain
committed
}
type proposalCtx struct {
ch chan error
ctx context.Context
}
Manish R Jain
committed
type proposals struct {
sync.RWMutex
Manish R Jain
committed
}
func (p *proposals) Store(pid uint32, pctx *proposalCtx) bool {
Manish R Jain
committed
p.Lock()
defer p.Unlock()
if _, has := p.ids[pid]; has {
return false
}
Manish R Jain
committed
}
func (p *proposals) Ctx(pid uint32) (context.Context, bool) {
p.RLock()
defer p.RUnlock()
if pd, has := p.ids[pid]; has {
return pd.ctx, true
}
return nil, false
}
Manish R Jain
committed
func (p *proposals) Done(pid uint32, err error) {
p.Lock()
Manish R Jain
committed
if has {
delete(p.ids, pid)
}
p.Unlock()
if !has {
return
}
Manish R Jain
committed
}
func (p *proposals) Has(pid uint32) bool {
p.RLock()
defer p.RUnlock()
_, has := p.ids[pid]
return has
}
type sendmsg struct {
to uint64
data []byte
}
Manish R Jain
committed
type node struct {
x.SafeMutex
// SafeMutex is for fields which can be changed after init.
_confState *raftpb.ConfState
_raft raft.Node
// Fields which are never changed after init.
cfg *raft.Config
ctx context.Context
stop chan struct{} // to send the stop signal to Run
done chan struct{} // to check whether node is running or not
id uint64
messages chan sendmsg
peers peerPool
props proposals
raftContext *protos.RaftContext
store *raft.MemoryStorage
// applied is used to keep track of the applied RAFT proposals.
// The stages are proposed -> committed (accepted by cluster) ->
// applied (to PL) -> synced (to RocksDB).
applied x.WaterMark
Manish R Jain
committed
}
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// SetRaft would set the provided raft.Node to this node.
// It would check fail if the node is already set.
func (n *node) SetRaft(r raft.Node) {
n.Lock()
defer n.Unlock()
x.AssertTrue(n._raft == nil)
n._raft = r
}
// Raft would return back the raft.Node stored in the node.
func (n *node) Raft() raft.Node {
n.RLock()
defer n.RUnlock()
return n._raft
}
// SetConfState would store the latest ConfState generated by ApplyConfChange.
func (n *node) SetConfState(cs *raftpb.ConfState) {
n.Lock()
defer n.Unlock()
n._confState = cs
}
// ConfState would return the latest ConfState stored in node.
func (n *node) ConfState() *raftpb.ConfState {
n.RLock()
defer n.RUnlock()
return n._confState
}
Manish R Jain
committed
func newNode(gid uint32, id uint64, myAddr string) *node {
x.Printf("Node with GroupID: %v, ID: %v\n", gid, id)
Manish R Jain
committed
peers := peerPool{
peers: make(map[uint64]peerPoolEntry),
Manish R Jain
committed
}
props := proposals{
ids: make(map[uint32]*proposalCtx),
Manish R Jain
committed
}
store := raft.NewMemoryStorage()
rc := &protos.RaftContext{
Manish R Jain
committed
Addr: myAddr,
Group: gid,
Id: id,
}
n := &node{
ctx: context.Background(),
id: id,
gid: gid,
store: store,
cfg: &raft.Config{
ID: id,
ElectionTick: 10, // 200 ms if we call Tick() every 20 ms.
HeartbeatTick: 1, // 20 ms if we call Tick() every 20 ms.
Manish R Jain
committed
Storage: store,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
Logger: &raft.DefaultLogger{Logger: x.Logger},
Manish R Jain
committed
},
applyCh: make(chan raftpb.Entry, numPendingMutations),
Manish R Jain
committed
peers: peers,
props: props,
raftContext: rc,
messages: make(chan sendmsg, 1000),
stop: make(chan struct{}),
done: make(chan struct{}),
Manish R Jain
committed
}
n.applied = x.WaterMark{Name: fmt.Sprintf("Committed: Group %d", n.gid)}
Manish R Jain
committed
return n
}
// Never returns ("", true)
func (n *node) GetPeer(pid uint64) (string, bool) {
return n.peers.get(pid)
}
// You must call release on the pool. Can error for some pid's for which GetPeer
// succeeds.
func (n *node) GetPeerPool(pid uint64) (*pool, error) {
return n.peers.getPool(pid)
}
// addr must not be empty.
func (n *node) SetPeer(pid uint64, addr string, poolOrNil *pool) {
x.AssertTruef(addr != "", "SetPeer for peer %d has empty addr.", pid)
n.peers.set(pid, addr, poolOrNil)
}
// Connects the node and makes its peerPool refer to the constructed pool and address
// (possibly updating ourselves from the old address.) (Unless pid is ourselves, in which
// case this does nothing.)
Manish R Jain
committed
func (n *node) Connect(pid uint64, addr string) {
if pid == n.id {
return
}
if paddr, ok := n.GetPeer(pid); ok && paddr == addr {
// Already connected.
Manish R Jain
committed
return
}
// Here's what we do. Right now peerPool maps peer node id's to addr values. If
// a *pool can be created, good, but if not, we still create a peerPoolEntry with
// a nil *pool.
p, ok := pools().connect(addr)
if !ok {
// TODO: Note this fact in more general peer health info somehow.
x.Printf("Peer %d claims same host as me\n", pid)
}
n.SetPeer(pid, addr, p)
Manish R Jain
committed
}
func (n *node) AddToCluster(ctx context.Context, pid uint64) error {
addr, ok := n.GetPeer(pid)
x.AssertTruef(ok, "Unable to find conn pool for peer: %d", pid)
rc := &protos.RaftContext{
Addr: addr,
Group: n.raftContext.Group,
Id: pid,
}
rcBytes, err := rc.Marshal()
x.Check(err)
return n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{
Manish R Jain
committed
ID: pid,
Type: raftpb.ConfChangeAddNode,
NodeID: pid,
Manish R Jain
committed
})
}
Manish R Jain
committed
type header struct {
proposalId uint32
msgId uint16
}
func (h *header) Length() int {
return 6 // 4 bytes for proposalId, 2 bytes for msgId.
}
func (h *header) Encode() []byte {
result := make([]byte, h.Length())
binary.LittleEndian.PutUint32(result[0:4], h.proposalId)
binary.LittleEndian.PutUint16(result[4:6], h.msgId)
return result
}
func (h *header) Decode(in []byte) {
h.proposalId = binary.LittleEndian.Uint32(in[0:4])
h.msgId = binary.LittleEndian.Uint16(in[4:6])
}
func (n *node) ProposeAndWait(ctx context.Context, proposal *protos.Proposal) error {
return x.Errorf("RAFT isn't initialized yet")
}
x.PendingProposals.Add(1)
defer func() { <-pendingProposals; x.PendingProposals.Add(-1) }()
if ctx.Err() != nil {
return ctx.Err()
}
// Do a type check here if schema is present
// In very rare cases invalid entries might pass through raft, which would
// be persisted, we do best effort schema check while writing
if proposal.Mutations != nil {
for _, edge := range proposal.Mutations.Edges {
if typ, err := schema.State().TypeOf(edge.Attr); err != nil {
continue
} else if err := validateAndConvert(edge, typ); err != nil {
return err
}
}
for _, schema := range proposal.Mutations.Schema {
if err := checkSchema(schema); err != nil {
return err
}
}
pctx := &proposalCtx{
ch: che,
ctx: ctx,
}
Manish R Jain
committed
Manish R Jain
committed
if err != nil {
return err
}
Manish R Jain
committed
// we don't timeout on a mutation which has already been proposed.
if err = n.Raft().Propose(ctx, slice[:upto]); err != nil {
Manish R Jain
committed
return x.Wrapf(err, "While proposing")
}
// Wait for the proposal to be committed.
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Waiting for the proposal: mutations.")
}
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Waiting for the proposal: membership update.")
}
log.Fatalf("Unknown proposal")
err = <-che
if err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf(err.Error())
Manish R Jain
committed
}
Manish R Jain
committed
}
Manish R Jain
committed
func (n *node) send(m raftpb.Message) {
x.AssertTruef(n.id != m.To, "Seding message to itself")
if m.Type != raftpb.MsgHeartbeat && m.Type != raftpb.MsgHeartbeatResp {
x.Printf("\t\tSENDING: %v %v-->%v\n", m.Type, m.From, m.To)
select {
case n.messages <- sendmsg{to: m.To, data: data}:
// pass
default:
x.Fatalf("Unable to push messages to channel in send")
Manish R Jain
committed
const (
messageBatchSoftLimit = 10000000
)
func (n *node) batchAndSendMessages() {
batches := make(map[uint64]*bytes.Buffer)
for {
sm := <-n.messages
slurp_loop:
for {
var buf *bytes.Buffer
if b, ok := batches[sm.to]; !ok {
buf = new(bytes.Buffer)
batches[sm.to] = buf
} else {
buf = b
}
x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
x.Check2(buf.Write(sm.data))
if totalSize > messageBatchSoftLimit {
// We limit the batch size, but we aren't pushing back on
// n.messages, because the loop below spawns a goroutine
// to do its dirty work. This is good because right now
// (*node).send fails(!) if the channel is full.
break
}
select {
case sm = <-n.messages:
default:
break slurp_loop
}
for to, buf := range batches {
if buf.Len() == 0 {
continue
}
data := make([]byte, buf.Len())
copy(data, buf.Bytes())
go n.doSendMessage(to, data)
buf.Reset()
func (n *node) doSendMessage(to uint64, data []byte) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Manish R Jain
committed
pool, err := n.GetPeerPool(to)
if err != nil {
// No such peer exists or we got handed a bogus config (bad addr), so we
// can't send messages to this peer.
return
}
defer pools().release(pool)
conn := pool.Get()
Manish R Jain
committed
c := protos.NewWorkerClient(conn)
p := &protos.Payload{Data: data}
ch := make(chan error, 1)
go func() {
_, err = c.RaftMessage(ctx, p)
ch <- err
}()
select {
case <-ctx.Done():
return
case <-ch:
// We don't need to do anything if we receive any error while sending message.
// RAFT would automatically retry.
return
}
}
func (n *node) processMutation(ctx context.Context, index uint64, m *protos.Mutations) error {
// TODO: Need to pass node and entry index.
ctx = context.WithValue(ctx, "raft", rv)
if err := runMutations(ctx, m.Edges); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf(err.Error())
}
return err
}
return nil
}
func (n *node) processSchemaMutations(e raftpb.Entry, m *protos.Mutations) error {
// TODO: Need to pass node and entry index.
rv := x.RaftValue{Group: n.gid, Index: e.Index}
ctx := context.WithValue(n.ctx, "raft", rv)
if err := runSchemaMutations(ctx, m.Schema); err != nil {
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf(err.Error())
}
return err
}
return nil
}
func (n *node) processMembership(index uint64, mm *protos.Membership) error {
x.Printf("group: %v Addr: %q leader: %v dead: %v\n",
mm.GroupId, mm.Addr, mm.Leader, mm.AmDead)
return nil
}
func (n *node) process(index uint64, proposal *protos.Proposal, pending chan struct{}) {
n.applied.Ch <- x.Mark{Index: index, Done: true}
posting.SyncMarkFor(n.gid).Ch <- x.Mark{Index: index, Done: true}
pending <- struct{}{} // This will block until we can write to it.
x.ActiveMutations.Add(1)
defer x.ActiveMutations.Add(-1)
Manish R Jain
committed
var err error
if proposal.Mutations != nil {
var ctx context.Context
var has bool
if ctx, has = n.props.Ctx(proposal.Id); !has {
ctx = n.ctx
}
err = n.processMutation(ctx, index, proposal.Mutations)
} else if proposal.Membership != nil {
err = n.processMembership(index, proposal.Membership)
Manish R Jain
committed
}
n.props.Done(proposal.Id, err)
<-pending // Release one.
}
Manish R Jain
committed
Manish R Jain
committed
func (n *node) processApplyCh() {
Manish R Jain
committed
pending := make(chan struct{}, numPendingMutations)
for e := range n.applyCh {
mark := x.Mark{Index: e.Index, Done: true}
n.applied.Ch <- mark
posting.SyncMarkFor(n.gid).Ch <- mark
if e.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
if len(cc.Context) > 0 {
var rc protos.RaftContext
x.Check(rc.Unmarshal(cc.Context))
n.Connect(rc.Id, rc.Addr)
}
cs := n.Raft().ApplyConfChange(cc)
n.SetConfState(cs)
n.applied.Ch <- mark
posting.SyncMarkFor(n.gid).Ch <- mark
continue
Manish R Jain
committed
}
x.AssertTrue(e.Type == raftpb.EntryNormal)
// The following effort is only to apply schema in a blocking fashion.
// Once we have a scheduler, this should go away.
// TODO: Move the following to scheduler.
// We derive the schema here if it's not present
// Since raft committed logs are serialized, we can derive
// schema here without any locking
proposal := &protos.Proposal{}
if err := proposal.Unmarshal(e.Data); err != nil {
log.Fatalf("Unable to unmarshal proposal: %v %q\n", err, e.Data)
// process schema mutations before
if proposal.Mutations.Schema != nil {
// Wait for applied watermark to reach till previous index
// All mutations before this should use old schema and after this
// should use new schema
n.waitForSyncMark(n.ctx, e.Index-1)
if err := n.processSchemaMutations(e, proposal.Mutations); err != nil {
n.applied.Ch <- mark
posting.SyncMarkFor(n.gid).Ch <- mark
n.props.Done(proposal.Id, err)
// stores a map of predicate and type of first mutation for each predicate
schemaMap := make(map[string]types.TypeID)
for _, edge := range proposal.Mutations.Edges {
if _, ok := schemaMap[edge.Attr]; !ok {
schemaMap[edge.Attr] = posting.TypeID(edge)
}
}
for attr, storageType := range schemaMap {
if _, err := schema.State().TypeOf(attr); err != nil {
// Schema doesn't exist
// Since committed entries are serialized, updateSchemaIfMissing is not
// needed, In future if schema needs to be changed, it would flow through
// raft so there won't be race conditions between read and update schema
updateSchemaType(attr, storageType, e.Index, n.raftContext.Group)
Manish R Jain
committed
}
}
func (n *node) saveToStorage(s raftpb.Snapshot, h raftpb.HardState,
es []raftpb.Entry) {
if !raft.IsEmptySnap(s) {
le, err := n.store.LastIndex()
if err != nil {
log.Fatalf("While retrieving last index: %v\n", err)
}
if s.Metadata.Index <= le {
return
}
if err := n.store.ApplySnapshot(s); err != nil {
log.Fatalf("Applying snapshot: %v", err)
}
}
if !raft.IsEmptyHardState(h) {
n.store.SetHardState(h)
}
n.store.Append(es)
}
func (n *node) retrieveSnapshot(peerID uint64) {
pool, err := n.GetPeerPool(peerID)
// err is just going to be errNoConnection
log.Fatalf("Cannot retrieve snapshot from peer %v, no connection. Error: %v\n",
peerID, err)
}
defer pools().release(pool)
// Get index of last committed.
lastIndex, err := n.store.LastIndex()
x.Checkf(err, "Error while getting last index")
// Wait for watermarks to sync since populateShard writes directly to db, otherwise
// the values might get overwritten
// Safe to keep this line
// Need to clear pl's stored in memory for the case when retrieving snapshot with
// index greater than this node's last index
// Should invalidate/remove pl's to this group only ideally
Janardhan Reddy
committed
posting.EvictGroup(n.gid)
if _, err := populateShard(n.ctx, pool, n.gid); err != nil {
// TODO: We definitely don't want to just fall flat on our face if we can't
// retrieve a simple snapshot.
log.Fatalf("Cannot retrieve snapshot from peer %v, error: %v\n", peerID, err)
}
// Populate shard stores the streamed data directly into db, so we need to refresh
// schema for current group id
x.Checkf(schema.LoadFromDb(n.gid), "Error while initilizating schema")
Manish R Jain
committed
}
func (n *node) Run() {
// See also our configuration of HeartbeatTick and ElectionTick.
ticker := time.NewTicker(20 * time.Millisecond)
rcBytes, err := n.raftContext.Marshal()
x.Check(err)
Manish R Jain
committed
for {
select {
Manish R Jain
committed
case rd := <-n.Raft().Ready():
if rd.SoftState != nil {
if rd.RaftState == raft.StateFollower && leader {
// stepped down as leader do a sync membership immediately
groups().syncMemberships()
} else if rd.RaftState == raft.StateLeader && !leader {
// TODO:wait for apply watermark ??
leaseMgr().resetLease(n.gid)
groups().syncMemberships()
}
leader = rd.RaftState == raft.StateLeader
}
x.Check(n.wal.StoreSnapshot(n.gid, rd.Snapshot))
x.Check(n.wal.Store(n.gid, rd.HardState, rd.Entries))
Manish R Jain
committed
n.saveToStorage(rd.Snapshot, rd.HardState, rd.Entries)
Manish R Jain
committed
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
Manish R Jain
committed
n.send(msg)
}
Manish R Jain
committed
if !raft.IsEmptySnap(rd.Snapshot) {
// We don't send snapshots to other nodes. But, if we get one, that means
// either the leader is trying to bring us up to state; or this is the
// snapshot that I created. Only the former case should be handled.
var rc protos.RaftContext
x.AssertTrue(rc.Group == n.gid)
x.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id)
n.retrieveSnapshot(rc.Id)
x.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid)
x.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id)
Manish R Jain
committed
}
Manish R Jain
committed
if len(rd.CommittedEntries) > 0 {
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Found %d committed entries", len(rd.CommittedEntries))
}
Manish R Jain
committed
}
for _, entry := range rd.CommittedEntries {
// Need applied watermarks for schema mutation also for read linearazibility
// Applied watermarks needs to be emitted as soon as possible sequentially.
// If we emit Mark{4, false} and Mark{4, true} before emitting Mark{3, false}
// then doneUntil would be set as 4 as soon as Mark{4,true} is done and before
// Mark{3, false} is emitted. So it's safer to emit watermarks as soon as
// possible sequentially
status := x.Mark{Index: entry.Index, Done: false}
n.applied.Ch <- status
posting.SyncMarkFor(n.gid).Ch <- status
// Just queue up to be processed. Don't wait on them.
n.applyCh <- entry
Manish R Jain
committed
}
if firstRun && n.canCampaign {
go n.Raft().Campaign(n.ctx)
Manish R Jain
committed
if peerId, has := groups().Peer(n.gid, Config.RaftId); has && n.AmLeader() {
n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId)
go func() {
select {
case <-n.ctx.Done(): // time out
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("context timed out while transfering leadership")
}
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Timed out transfering leadership")
}
}
n.Raft().Stop()
close(n.done)
}()
} else {
n.Raft().Stop()
close(n.done)
}
case <-n.done:
Manish R Jain
committed
return
}
}
}
func (n *node) Stop() {
select {
case n.stop <- struct{}{}:
case <-n.done:
// already stopped.
return
}
<-n.done // wait for Run to respond.
}
Manish R Jain
committed
func (n *node) Step(ctx context.Context, msg raftpb.Message) error {
return n.Raft().Step(ctx, msg)
Manish R Jain
committed
}
func (n *node) snapshotPeriodically() {
if n.gid == 0 {
// Group zero is dedicated for membership information, whose state we don't persist.
// So, taking snapshots would end up deleting the RAFT entries that we need to
// regenerate the state on a crash. Therefore, don't take snapshots.
return
}
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
n.snapshot(Config.MaxPendingCount)
Manish R Jain
committed
case <-n.done:
return
}
Manish R Jain
committed
}
}
if n.gid == 0 {
// Group zero is dedicated for membership information, whose state we don't persist.
// So, taking snapshots would end up deleting the RAFT entries that we need to
// regenerate the state on a crash. Therefore, don't take snapshots.
return
}
water := posting.SyncMarkFor(n.gid)
le := water.DoneUntil()
existing, err := n.store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
si := existing.Metadata.Index
if le <= si+skip {
return
}
snapshotIdx := le - skip
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Taking snapshot for group: %d at watermark: %d\n", n.gid, snapshotIdx)
}
rc, err := n.raftContext.Marshal()
x.Check(err)
s, err := n.store.CreateSnapshot(snapshotIdx, n.ConfState(), rc)
x.Checkf(err, "While creating snapshot")
x.Checkf(n.store.Compact(snapshotIdx), "While compacting snapshot")
x.Check(n.wal.StoreSnapshot(n.gid, s))
}
// Get leader information for MY group.
Manish R Jain
committed
n.Connect(pid, paddr)
x.Printf("joinPeers connected with: %q with peer id: %d\n", paddr, pid)
Manish R Jain
committed
pool, err := pools().get(paddr)
if err != nil {
log.Fatalf("Unable to get pool for addr: %q for peer: %d, error: %v\n", paddr, pid, err)
}
defer pools().release(pool)
Manish R Jain
committed
Manish R Jain
committed
// Bring the instance up to speed first.
// Raft would decide whether snapshot needs to fetched or not
// so populateShard is not needed
// _, err := populateShard(n.ctx, pool, n.gid)
// x.Checkf(err, "Error while populating shard")
Manish R Jain
committed
c := protos.NewWorkerClient(conn)
x.Printf("Calling JoinCluster")
_, err = c.JoinCluster(n.ctx, n.raftContext)
Manish R Jain
committed
x.Checkf(err, "Error while joining cluster")
x.Printf("Done with JoinCluster call\n")
}
func (n *node) initFromWal(wal *raftwal.Wal) (restart bool, rerr error) {
n.wal = wal
var sp raftpb.Snapshot
sp, rerr = wal.Snapshot(n.gid)
if rerr != nil {
return
}
var term, idx uint64
if !raft.IsEmptySnap(sp) {
x.Printf("Found Snapshot: %+v\n", sp)
restart = true
if rerr = n.store.ApplySnapshot(sp); rerr != nil {
return
}
term = sp.Metadata.Term
idx = sp.Metadata.Index
n.applied.SetDoneUntil(idx)
posting.SyncMarkFor(n.gid).SetDoneUntil(idx)
}
var hd raftpb.HardState
hd, rerr = wal.HardState(n.gid)
if rerr != nil {
return
}
if !raft.IsEmptyHardState(hd) {
x.Printf("Found hardstate: %+v\n", sp)
restart = true
if rerr = n.store.SetHardState(hd); rerr != nil {
return
Manish R Jain
committed
}
}
var es []raftpb.Entry
es, rerr = wal.Entries(n.gid, term, idx)
if rerr != nil {
return
}
x.Printf("Group %d found %d entries\n", n.gid, len(es))
if len(es) > 0 {
restart = true
}
rerr = n.store.Append(es)
return
}
// InitAndStartNode gets called after having at least one membership sync with the cluster.
func (n *node) InitAndStartNode(wal *raftwal.Wal) {
restart, err := n.initFromWal(wal)
x.Check(err)
if restart {
x.Printf("Restarting node for group: %d\n", n.gid)
n.SetRaft(raft.RestartNode(n.cfg))
x.Printf("New Node for group: %d\n", n.gid)
if groups().HasPeer(n.gid) {
n.joinPeers()
n.SetRaft(raft.StartNode(n.cfg, nil))
} else {
peers := []raft.Peer{{ID: n.id}}
n.SetRaft(raft.StartNode(n.cfg, peers))
// Trigger election, so this node can become the leader of this single-node cluster.
n.canCampaign = true
Manish R Jain
committed
go n.Run()
// TODO: Find a better way to snapshot, so we don't lose the membership
// state information, which isn't persisted.
Manish R Jain
committed
}
func (n *node) AmLeader() bool {
r := n.Raft()
return r.Status().Lead == r.Status().ID
Manish R Jain
committed
}
var (
errNoNode = fmt.Errorf("No node has been set up yet")
)
func applyMessage(ctx context.Context, msg raftpb.Message) error {
var rc protos.RaftContext
x.Check(rc.Unmarshal(msg.Context))
node := groups().Node(rc.Group)