Newer
Older
* This file is available under the Apache License, Version 2.0,
* with the Commons Clause restriction.
Manish R Jain
committed
package worker
import (
Manish R Jain
committed
"encoding/binary"
Pawan Rawal
committed
"encoding/hex"
Manish R Jain
committed
"math/rand"
Manish R Jain
committed
"sync"
"time"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"github.com/dgraph-io/badger/y"
"github.com/dgraph-io/dgo/protos/api"
dy "github.com/dgraph-io/dgo/y"
Manish R Jain
committed
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
Manish R Jain
committed
"github.com/dgraph-io/dgraph/x"
)
type proposalCtx struct {
ch chan error
ctx context.Context
// Since each proposal consists of multiple tasks we need to store
// non-nil error returned by task
err error
Pawan Rawal
committed
index uint64 // RAFT index for the proposal.
Janardhan Reddy
committed
// Used for writing all deltas at end
txn *posting.Txn
Manish R Jain
committed
type proposals struct {
sync.RWMutex
Pawan Rawal
committed
// The key is hex encoded version of <raft_id_of_node><random_uint64>
// This should make sure its not same across replicas.
Manish R Jain
committed
}
Pawan Rawal
committed
func uniqueKey() string {
b := make([]byte, 16)
copy(b[:8], groups().Node.raftIdBuffer)
groups().Node.rand.Read(b[8:])
return hex.EncodeToString(b)
}
func (p *proposals) Store(key string, pctx *proposalCtx) bool {
Manish R Jain
committed
p.Lock()
defer p.Unlock()
Manish R Jain
committed
}
Pawan Rawal
committed
func (p *proposals) pctx(key string) *proposalCtx {
Pawan Rawal
committed
func (p *proposals) CtxAndTxn(key string) (context.Context, *posting.Txn) {
Janardhan Reddy
committed
x.AssertTrue(has)
return pd.ctx, pd.txn
Pawan Rawal
committed
func (p *proposals) Done(key string, err error) {
Manish R Jain
committed
p.Lock()
Manish R Jain
committed
if !has {
return
}
Manish R Jain
committed
}
Manish R Jain
committed
type node struct {
*conn.Node
// Changed after init but not protected by SafeMutex
requestCh chan linReadReq
// Fields which are never changed after init.
applyCh chan raftpb.Entry
ctx context.Context
stop chan struct{} // to send the stop signal to Run
done chan struct{} // to check whether node is running or not
gid uint32
props proposals
Pawan Rawal
committed
canCampaign bool
rand *rand.Rand
raftIdBuffer []byte
}
func (n *node) WaitForMinProposal(ctx context.Context, read *api.LinRead) error {
if read == nil {
return nil
}
if read.Sequencing == api.LinRead_SERVER_SIDE {
return n.WaitLinearizableRead(ctx)
}
if read.Ids == nil {
return nil
}
gid := n.RaftContext.Group
min := read.Ids[gid]
return n.Applied.WaitForMark(ctx, min)
Pawan Rawal
committed
type lockedSource struct {
lk sync.Mutex
src rand.Source
}
func (r *lockedSource) Int63() int64 {
r.lk.Lock()
defer r.lk.Unlock()
return r.src.Int63()
}
func (r *lockedSource) Seed(seed int64) {
r.lk.Lock()
defer r.lk.Unlock()
r.src.Seed(seed)
}
func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) *node {
x.Printf("Node ID: %v with GroupID: %v\n", id, gid)
Manish R Jain
committed
Manish R Jain
committed
Addr: myAddr,
Group: gid,
Id: id,
}
props := proposals{
keys: make(map[string]*proposalCtx),
}
Manish R Jain
committed
Pawan Rawal
committed
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, id)
Manish R Jain
committed
n := &node{
Node: m,
requestCh: make(chan linReadReq),
ctx: context.Background(),
gid: gid,
// processConfChange etc are not throttled so some extra delta, so that we don't
// block tick when applyCh is full
Pawan Rawal
committed
applyCh: make(chan raftpb.Entry, Config.NumPendingProposals+1000),
props: props,
stop: make(chan struct{}),
done: make(chan struct{}),
rand: rand.New(&lockedSource{src: rand.NewSource(time.Now().UnixNano())}),
raftIdBuffer: b,
Manish R Jain
committed
}
return n
}
Manish R Jain
committed
type header struct {
proposalId uint32
msgId uint16
}
func (h *header) Length() int {
return 6 // 4 bytes for proposalId, 2 bytes for msgId.
}
func (h *header) Encode() []byte {
result := make([]byte, h.Length())
binary.LittleEndian.PutUint32(result[0:4], h.proposalId)
binary.LittleEndian.PutUint16(result[4:6], h.msgId)
return result
}
func (h *header) Decode(in []byte) {
h.proposalId = binary.LittleEndian.Uint32(in[0:4])
h.msgId = binary.LittleEndian.Uint16(in[4:6])
}
Pawan Rawal
committed
// proposeAndWait sends a proposal through RAFT. It waits on a channel for the proposal
// to be applied(written to WAL) to all the nodes in the group.
func (n *node) proposeAndWait(ctx context.Context, proposal *intern.Proposal) error {
return x.Errorf("Raft isn't initialized yet")
// TODO: Should be based on number of edges (amount of work)
x.PendingProposals.Add(1)
defer func() { <-pendingProposals; x.PendingProposals.Add(-1) }()
if ctx.Err() != nil {
return ctx.Err()
}
// Do a type check here if schema is present
// In very rare cases invalid entries might pass through raft, which would
// be persisted, we do best effort schema check while writing
if proposal.Mutations != nil {
for _, edge := range proposal.Mutations.Edges {
Janardhan Reddy
committed
if tablet := groups().Tablet(edge.Attr); tablet != nil && tablet.ReadOnly {
return errPredicateMoving
Pawan Rawal
committed
} else if tablet.GroupId != groups().groupId() {
// Tablet can move by the time request reaches here.
return errUnservedTablet
Janardhan Reddy
committed
}
su, ok := schema.State().Get(edge.Attr)
if !ok {
} else if err := ValidateAndConvert(edge, &su); err != nil {
for _, schema := range proposal.Mutations.Schema {
Janardhan Reddy
committed
if tablet := groups().Tablet(schema.Predicate); tablet != nil && tablet.ReadOnly {
return errPredicateMoving
}
if err := checkSchema(schema); err != nil {
pctx := &proposalCtx{
ch: che,
ctx: ctx,
}
Pawan Rawal
committed
key := uniqueKey()
x.AssertTruef(n.props.Store(key, pctx), "Found existing proposal with key: [%v]", key)
proposal.Key = key
Manish R Jain
committed
Manish R Jain
committed
if err != nil {
return err
}
Manish R Jain
committed
// Some proposals can be stuck if leader change happens. For e.g. MsgProp message from follower
// to leader can be dropped/end up appearing with empty Data in CommittedEntries.
// Having a timeout here prevents the mutation being stuck forever in case they don't have a
// timeout.
cctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
if err = n.Raft().Propose(cctx, slice[:upto]); err != nil {
Manish R Jain
committed
return x.Wrapf(err, "While proposing")
}
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Waiting for the proposal.")
select {
case err = <-che:
if err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Raft Propose error: %v", err)
}
case <-cctx.Done():
return fmt.Errorf("While proposing to Raft group, err: %+v\n", cctx.Err())
Manish R Jain
committed
}
Manish R Jain
committed
}
func (n *node) processEdge(ridx uint64, pkey string, edge *intern.DirectedEdge) error {
ctx, txn := n.props.CtxAndTxn(pkey)
if txn.ShouldAbort() {
return dy.ErrConflict
rv := x.RaftValue{Group: n.gid, Index: ridx}
ctx = context.WithValue(ctx, "raft", rv)
// Index updates would be wrong if we don't wait.
// Say we do <0x1> <name> "janardhan", <0x1> <name> "pawan",
// while applying the second mutation we check the old value
// of name and delete it from "janardhan"'s index. If we don't
// wait for commit information then mutation won't see the value
posting.Oracle().WaitForTs(context.Background(), txn.StartTs)
Janardhan Reddy
committed
if err := runMutation(ctx, edge, txn); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("process mutation: %v", err)
return err
}
return nil
}
func (n *node) processSchemaMutations(pkey string, index uint64,
startTs uint64, s *intern.SchemaUpdate) error {
ctx, _ := n.props.CtxAndTxn(pkey)
rv := x.RaftValue{Group: n.gid, Index: index}
Janardhan Reddy
committed
ctx = context.WithValue(ctx, "raft", rv)
if err := runSchemaMutation(ctx, s, startTs); err != nil {
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf(err.Error())
}
return err
}
return nil
}
func (n *node) applyConfChange(e raftpb.Entry) {
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
if cc.Type == raftpb.ConfChangeRemoveNode {
n.DeletePeer(cc.NodeID)
} else if len(cc.Context) > 0 {
x.Check(rc.Unmarshal(cc.Context))
n.Connect(rc.Id, rc.Addr)
}
cs := n.Raft().ApplyConfChange(cc)
n.SetConfState(cs)
n.DoneConfChange(cc.ID, nil)
func waitForConflictResolution(attr string) error {
for i := 0; i < 10; i++ {
tctxs := posting.Txns().Iterate(func(key []byte) bool {
pk := x.Parse(key)
return pk.Attr == attr
})
if len(tctxs) == 0 {
return nil
tryAbortTransactions(tctxs)
}
return errors.New("Unable to abort transactions")
}
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
func updateTxns(raftIndex uint64, startTs uint64) *posting.Txn {
txn := &posting.Txn{
StartTs: startTs,
Indices: []uint64{raftIndex},
}
return posting.Txns().PutOrMergeIndex(txn)
}
// We don't support schema mutations across nodes in a transaction.
// Wait for all transactions to either abort or complete and all write transactions
// involving the predicate are aborted until schema mutations are done.
func (n *node) applyMutations(proposal *intern.Proposal, index uint64) error {
if proposal.Mutations.DropAll {
// Ensures nothing get written to disk due to commit proposals.
posting.Txns().Reset()
schema.State().DeleteAll()
err := posting.DeleteAll()
posting.TxnMarks().Done(index)
return err
}
if proposal.Mutations.StartTs == 0 {
posting.TxnMarks().Done(index)
return errors.New("StartTs must be provided.")
}
startTs := proposal.Mutations.StartTs
if len(proposal.Mutations.Schema) > 0 {
defer posting.TxnMarks().Done(index)
for _, supdate := range proposal.Mutations.Schema {
// This is neceassry to ensure that there is no race between when we start reading
// from badger and new mutation getting commited via raft and getting applied.
// Before Moving the predicate we would flush all and wait for watermark to catch up
// but there might be some proposals which got proposed but not comitted yet.
// It's ok to reject the proposal here and same would happen on all nodes because we
// would have proposed membershipstate, and all nodes would have the proposed state
// or some state after that before reaching here.
if tablet := groups().Tablet(supdate.Predicate); tablet != nil && tablet.ReadOnly {
return errPredicateMoving
}
if err := waitForConflictResolution(supdate.Predicate); err != nil {
return err
}
if err := n.processSchemaMutations(proposal.Key, index, startTs, supdate); err != nil {
return err
}
Manish R Jain
committed
}
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
// Scheduler tracks tasks at subject, predicate level, so doing
// schema stuff here simplies the design and we needn't worry about
// serializing the mutations per predicate or schema mutations
// We derive the schema here if it's not present
// Since raft committed logs are serialized, we can derive
// schema here without any locking
// stores a map of predicate and type of first mutation for each predicate
schemaMap := make(map[string]types.TypeID)
for _, edge := range proposal.Mutations.Edges {
if tablet := groups().Tablet(edge.Attr); tablet != nil && tablet.ReadOnly {
updateTxns(index, proposal.Mutations.StartTs)
return errPredicateMoving
}
if edge.Entity == 0 && bytes.Equal(edge.Value, []byte(x.Star)) {
// We should only have one edge drop in one mutation call.
ctx, _ := n.props.CtxAndTxn(proposal.Key)
defer posting.TxnMarks().Done(index)
if err := waitForConflictResolution(edge.Attr); err != nil {
return err
}
return posting.DeletePredicate(ctx, edge.Attr)
}
// Dont derive schema when doing deletion.
if edge.Op == intern.DirectedEdge_DEL {
continue
}
if _, ok := schemaMap[edge.Attr]; !ok {
schemaMap[edge.Attr] = posting.TypeID(edge)
}
}
total := len(proposal.Mutations.Edges)
x.ActiveMutations.Add(int64(total))
for attr, storageType := range schemaMap {
if _, err := schema.State().TypeOf(attr); err != nil {
// Schema doesn't exist
// Since committed entries are serialized, updateSchemaIfMissing is not
// needed, In future if schema needs to be changed, it would flow through
// raft so there won't be race conditions between read and update schema
updateSchemaType(attr, storageType, index)
m := proposal.Mutations
pctx := n.props.pctx(proposal.Key)
pctx.txn = updateTxns(index, m.StartTs)
for _, edge := range m.Edges {
err := posting.ErrRetry
for err == posting.ErrRetry {
err = n.processEdge(index, proposal.Key, edge)
}
if err != nil {
return err
Pawan Rawal
committed
}
x.ActiveMutations.Add(-1)
}
return nil
}
Pawan Rawal
committed
func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error {
if proposal.DeprecatedId != 0 {
proposal.Key = fmt.Sprint(proposal.DeprecatedId)
}
// One final applied and synced watermark would be emitted when proposal ctx ref count
// becomes zero.
pctx := n.props.pctx(proposal.Key)
if pctx == nil {
// This is during replay of logs after restart or on a replica.
pctx = &proposalCtx{
ch: make(chan error, 1),
ctx: n.ctx,
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
// We assert here to make sure that we do add the proposal to the map.
x.AssertTruef(n.props.Store(proposal.Key, pctx),
"Found existing proposal with key: [%v]", proposal.Key)
}
pctx.index = index
// TODO: We should be able to remove this as well.
posting.TxnMarks().Begin(index)
if proposal.Mutations != nil {
// syncmarks for this shouldn't be marked done until it's comitted.
return n.applyMutations(proposal, index)
}
defer posting.TxnMarks().Done(index)
if len(proposal.Kv) > 0 {
return n.processKeyValues(proposal.Key, proposal.Kv)
} else if proposal.State != nil {
// This state needn't be snapshotted in this group, on restart we would fetch
// a state which is latest or equal to this.
groups().applyState(proposal.State)
return nil
} else if len(proposal.CleanPredicate) > 0 {
return n.deletePredicate(proposal.Key, proposal.CleanPredicate)
} else if proposal.TxnContext != nil {
return n.commitOrAbort(proposal.Key, proposal.TxnContext)
} else {
x.Fatalf("Unknown proposal")
}
return nil
}
func (n *node) processApplyCh() {
for e := range n.applyCh {
proposal := &intern.Proposal{}
if err := proposal.Unmarshal(e.Data); err != nil {
x.Fatalf("Unable to unmarshal proposal: %v %q\n", err, e.Data)
err := n.applyCommitted(proposal, e.Index)
n.props.Done(proposal.Key, err)
n.Applied.Done(e.Index)
Manish R Jain
committed
}
}
func (n *node) commitOrAbort(pkey string, tctx *api.TxnContext) error {
ctx, _ := n.props.CtxAndTxn(pkey)
_, err := commitOrAbort(ctx, tctx)
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Status of commitOrAbort %+v %v\n", tctx, err)
}
if err == nil {
posting.Txns().Done(tctx.StartTs)
func (n *node) deletePredicate(pkey string, predicate string) error {
ctx, _ := n.props.CtxAndTxn(pkey)
return posting.DeletePredicate(ctx, predicate)
func (n *node) processKeyValues(pkey string, kvs []*intern.KV) error {
ctx, _ := n.props.CtxAndTxn(pkey)
return populateKeyValues(ctx, kvs)
Janardhan Reddy
committed
}
func (n *node) applyAllMarks(ctx context.Context) {
// Get index of last committed.
Pawan Rawal
committed
lastIndex := n.Applied.LastIndex()
n.Applied.WaitForMark(ctx, lastIndex)
}
func (n *node) leaderBlocking() (*conn.Pool, error) {
Pawan Rawal
committed
pool := groups().Leader(groups().groupId())
if pool == nil {
// Functions like retrieveSnapshot and joinPeers are blocking at initial start and
// leader election for a group might not have happened when it is called. If we can't
// find a leader, get latest state from
Pawan Rawal
committed
// Zero.
if err := UpdateMembershipState(context.Background()); err != nil {
return nil, fmt.Errorf("Error while trying to update membership state: %+v", err)
Pawan Rawal
committed
}
return nil, fmt.Errorf("Unable to reach leader in group %d", n.gid)
}
return pool, nil
}
func (n *node) retrieveSnapshot() error {
pool, err := n.leaderBlocking()
if err != nil {
return err
// Wait for watermarks to sync since populateShard writes directly to db, otherwise
// the values might get overwritten
// Safe to keep this line
n.applyAllMarks(n.ctx)
Pawan Rawal
committed
// Need to clear pl's stored in memory for the case when retrieving snapshot with
// index greater than this node's last index
// Should invalidate/remove pl's to this group only ideally
Pawan Rawal
committed
if _, err := n.populateShard(pstore, pool); err != nil {
return fmt.Errorf("Cannot retrieve snapshot from peer, error: %v\n", err)
// Populate shard stores the streamed data directly into db, so we need to refresh
// schema for current group id
if err := schema.LoadFromDb(); err != nil {
return fmt.Errorf("Error while initilizating schema: %+v\n", err)
}
Janardhan Reddy
committed
groups().triggerMembershipSync()
Manish R Jain
committed
}
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
type linReadReq struct {
// A one-shot chan which we send a raft index upon
indexCh chan<- uint64
}
func (n *node) readIndex(ctx context.Context) (chan uint64, error) {
ch := make(chan uint64, 1)
select {
case n.requestCh <- linReadReq{ch}:
return ch, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (n *node) runReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadState) {
defer closer.Done()
requests := []linReadReq{}
// We maintain one linearizable ReadIndex request at a time. Others wait queued behind
// requestCh.
for {
select {
case <-closer.HasBeenClosed():
return
case <-readStateCh:
// Do nothing, discard ReadState as we don't have any pending ReadIndex requests.
case req := <-n.requestCh:
slurpLoop:
for {
requests = append(requests, req)
select {
case req = <-n.requestCh:
default:
break slurpLoop
}
}
activeRctx := make([]byte, 8)
x.Check2(n.rand.Read(activeRctx[:]))
// To see if the ReadIndex request succeeds, we need to use a timeout and wait for a
// successful response. If we don't see one, the raft leader wasn't configured, or the
// raft leader didn't respond.
// This is supposed to use context.Background(). We don't want to cancel the timer
// externally. We want equivalent functionality to time.NewTimer.
// TODO: Second is high, if a node gets partitioned we would have to throw error sooner.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := n.Raft().ReadIndex(ctx, activeRctx[:])
if err != nil {
for _, req := range requests {
req.indexCh <- raft.None
}
continue
}
again:
select {
case <-closer.HasBeenClosed():
cancel()
return
case rs := <-readStateCh:
if 0 != bytes.Compare(activeRctx[:], rs.RequestCtx) {
goto again
}
cancel()
index := rs.Index
for _, req := range requests {
req.indexCh <- index
}
case <-ctx.Done():
for _, req := range requests {
req.indexCh <- raft.None
}
}
requests = requests[:0]
}
}
}
Manish R Jain
committed
func (n *node) Run() {
// See also our configuration of HeartbeatTick and ElectionTick.
ticker := time.NewTicker(20 * time.Millisecond)
rcBytes, err := n.RaftContext.Marshal()
// Ensure we don't exit unless any snapshot in progress in done.
go n.snapshotPeriodically(closer)
// This chan could have capacity zero, because runReadIndexLoop never blocks without selecting
// on readStateCh. It's 2 so that sending rarely blocks (so the Go runtime doesn't have to
// switch threads as much.)
readStateCh := make(chan raft.ReadState, 2)
// We only stop runReadIndexLoop after the for loop below has finished interacting with it.
// That way we know sending to readStateCh will not deadlock.
go n.runReadIndexLoop(closer, readStateCh)
Manish R Jain
committed
elog := trace.NewEventLog("Dgraph", "RunLoop")
defer elog.Finish()
var timer x.Timer
Manish R Jain
committed
for {
select {
Manish R Jain
committed
case rd := <-n.Raft().Ready():
Manish R Jain
committed
timer.Start()
for _, rs := range rd.ReadStates {
readStateCh <- rs
}
Janardhan Reddy
committed
groups().triggerMembershipSync()
leader = rd.RaftState == raft.StateLeader
}
if leader {
// Leader can send messages in parallel with writing to disk.
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
msg.Context = rcBytes
n.Send(msg)
Manish R Jain
committed
timer.Record() // Index 0.
// Store the hardstate and entries.
n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
Manish R Jain
committed
timer.Record() // Index 1.
Manish R Jain
committed
if !raft.IsEmptySnap(rd.Snapshot) {
// We don't send snapshots to other nodes. But, if we get one, that means
// either the leader is trying to bring us up to state; or this is the
// snapshot that I created. Only the former case should be handled.
x.AssertTrue(rc.Group == n.gid)
if rc.Id != n.Id {
// NOTE: Retrieving snapshot here is OK, after storing it above in WAL, because
// rc.Id != n.Id.
x.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id)
// It's ok to block tick while retrieving snapshot, since it's a follower
n.retryUntilSuccess(n.retrieveSnapshot, 100*time.Millisecond)
x.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid)
x.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id)
Manish R Jain
committed
}
Manish R Jain
committed
timer.Record() // Index 2.
Pawan Rawal
committed
lc := len(rd.CommittedEntries)
if lc > 0 {
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Found %d committed entries", len(rd.CommittedEntries))
}
Manish R Jain
committed
}
// Now schedule or apply committed entries.
for idx, entry := range rd.CommittedEntries {
// Need applied watermarks for schema mutation also for read linearazibility
// Applied watermarks needs to be emitted as soon as possible sequentially.
// If we emit Mark{4, false} and Mark{4, true} before emitting Mark{3, false}
// then doneUntil would be set as 4 as soon as Mark{4,true} is done and before
// Mark{3, false} is emitted. So it's safer to emit watermarks as soon as
// possible sequentially
n.Applied.Begin(entry.Index)
if entry.Type == raftpb.EntryConfChange {
n.applyConfChange(entry)
// Not present in proposal map.
n.Applied.Done(entry.Index)
groups().triggerMembershipSync()
} else if len(entry.Data) == 0 {
// TODO: Say something. Do something.
n.Applied.Done(entry.Index)
// When applyCh fills up, this would automatically block.
n.applyCh <- entry
}
// Move to debug log later.
// Sometimes after restart there are too many entries to replay, so log so that we
// know Run loop is replaying them.
if lc > 1e5 && idx%5000 == 0 {
x.Printf("In run loop applying committed entries, idx: [%v], pending: [%v]\n",
idx, lc-idx)
}
}
Manish R Jain
committed
timer.Record() // Index 3.
if lc > 1e5 {
x.Println("All committed entries sent to applyCh.")
Manish R Jain
committed
}
if !leader {
// Followers should send messages later.
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
msg.Context = rcBytes
n.Send(msg)
if firstRun && n.canCampaign {
go n.Raft().Campaign(n.ctx)
Manish R Jain
committed
Manish R Jain
committed
timer.Record() // Index 4.
if total := timer.Total(); total >= 100*time.Millisecond {
elog.Printf("Timer Total: %v. All: %+v.", total, timer.All())
}
Pawan Rawal
committed
if peerId, has := groups().MyPeer(); has && n.AmLeader() {
n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId)
go func() {
select {
case <-n.ctx.Done(): // time out
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("context timed out while transfering leadership")
}
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Timed out transfering leadership")
}
closer.SignalAndWait()
close(n.done)
}()
} else {
n.Raft().Stop()
closer.SignalAndWait()
Manish R Jain
committed
return
}
}
}
func (n *node) Stop() {
select {
case n.stop <- struct{}{}:
case <-n.done:
// already stopped.
return
}
<-n.done // wait for Run to respond.
}
func (n *node) snapshotPeriodically(closer *y.Closer) {
ticker := time.NewTicker(30 * time.Second)
for {
select {
// Some proposals like predicate move can consume around 32MB per proposal, so keeping
// too many proposals would increase the memory usage so snapshot as soon as
// possible
n.snapshot(10)
Manish R Jain
committed
case <-closer.HasBeenClosed():
closer.Done()
return
}
Manish R Jain
committed
}
}
func (n *node) abortOldTransactions(pending uint64) {
pl := groups().Leader(0)
if pl == nil {
return
}
zc := intern.NewZeroClient(pl.Get())
// Aborts if not already committed.
startTimestamps := posting.Txns().TxnsSinceSnapshot(pending)
req := &intern.TxnTimestamps{Ts: startTimestamps}
zc.TryAbort(context.Background(), req)
}
txnWatermark := posting.TxnMarks().DoneUntil()
existing, err := n.Store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
lastSnapshotIdx := existing.Metadata.Index
if txnWatermark <= lastSnapshotIdx+skip {
appliedWatermark := n.Applied.DoneUntil()
// If difference grows above 1.5 * ForceAbortDifference we try to abort old transactions
if appliedWatermark-txnWatermark > 1.5*x.ForceAbortDifference && skip != 0 {
// Print warning if difference grows above 3 * x.ForceAbortDifference. Shouldn't ideally
// happen as we abort oldest 20% when it grows above 1.5 times.
if appliedWatermark-txnWatermark > 3*x.ForceAbortDifference {
x.Printf("Couldn't take snapshot, txn watermark: [%d], applied watermark: [%d]\n",
txnWatermark, appliedWatermark)
}
// Try aborting pending transactions here.
n.abortOldTransactions(appliedWatermark - txnWatermark)
}
snapshotIdx := txnWatermark - skip
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Taking snapshot for group: %d at watermark: %d\n", n.gid, snapshotIdx)
}
Pawan Rawal
committed
rc, err := n.RaftContext.Marshal()
err = n.Store.CreateSnapshot(snapshotIdx, n.ConfState(), rc)
x.Printf("Writing snapshot at index: %d, applied mark: %d\n", snapshotIdx,
n.Applied.DoneUntil())
func (n *node) joinPeers() error {
pl, err := n.leaderBlocking()
if err != nil {
return err
Manish R Jain
committed
c := intern.NewRaftClient(gconn)
x.Printf("Calling JoinCluster via leader: %s", pl.Addr)
if _, err := c.JoinCluster(n.ctx, n.RaftContext); err != nil {
return x.Errorf("Error while joining cluster: %+v\n", err)
}
x.Printf("Done with JoinCluster call\n")
}
// Checks if its a peer from the leader of the group.
func (n *node) isMember() (bool, error) {
pl, err := n.leaderBlocking()
if err != nil {
return false, err
}
gconn := pl.Get()
c := intern.NewRaftClient(gconn)
x.Printf("Calling IsPeer")
pr, err := c.IsPeer(n.ctx, n.RaftContext)
if err != nil {
return false, x.Errorf("Error while joining cluster: %+v\n", err)
}
x.Printf("Done with IsPeer call\n")
return pr.Status, nil
}
func (n *node) retryUntilSuccess(fn func() error, pause time.Duration) {
var err error
for {
if err = fn(); err == nil {
break
}
x.Printf("Error while calling fn: %v. Retrying...\n", err)
time.Sleep(pause)
}
}
// InitAndStartNode gets called after having at least one membership sync with the cluster.
func (n *node) InitAndStartNode() {
idx, restart, err := n.PastLife()
posting.TxnMarks().SetDoneUntil(idx)
if _, hasPeer := groups().MyPeer(); !restart && hasPeer {
// The node has other peers, it might have crashed after joining the cluster and before
// writing a snapshot. Check from leader, if it is part of the cluster. Consider this a
// restart if it is part of the cluster, else start a new node.
for {
if restart, err = n.isMember(); err == nil {
break
}
x.Printf("Error while calling hasPeer: %v. Retrying...\n", err)
time.Sleep(time.Second)
}
}
x.Printf("Restarting node for group: %d\n", n.gid)
sp, err := n.Store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
if !raft.IsEmptySnap(sp) {
members := groups().members(n.gid)
for _, id := range sp.Metadata.ConfState.Nodes {
n.Connect(id, members[id].Addr)
}
n.SetRaft(raft.RestartNode(n.Cfg))