Newer
Older
* This file is available under the Apache License, Version 2.0,
* with the Commons Clause restriction.
Manish R Jain
committed
package worker
import (
Manish R Jain
committed
"encoding/binary"
Manish R Jain
committed
"sync"
"time"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"github.com/dgraph-io/dgo/protos/api"
dy "github.com/dgraph-io/dgo/y"
Manish R Jain
committed
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
Manish R Jain
committed
"github.com/dgraph-io/dgraph/x"
)
type proposalCtx struct {
ch chan error
ctx context.Context
// Since each proposal consists of multiple tasks we need to store
// non-nil error returned by task
err error
Pawan Rawal
committed
index uint64 // RAFT index for the proposal.
Janardhan Reddy
committed
// Used for writing all deltas at end
txn *posting.Txn
Manish R Jain
committed
type proposals struct {
sync.RWMutex
Manish R Jain
committed
}
// uniqueKey is meant to be unique across all the replicas.
Pawan Rawal
committed
func uniqueKey() string {
return fmt.Sprintf("%02d-%d", groups().Node.Id, groups().Node.Rand.Uint64())
Pawan Rawal
committed
}
func (p *proposals) Store(key string, pctx *proposalCtx) bool {
Manish R Jain
committed
p.Lock()
defer p.Unlock()
Manish R Jain
committed
}
func (p *proposals) Delete(key string) {
if len(key) == 0 {
return
}
p.Lock()
defer p.Unlock()
delete(p.all, key)
}
Pawan Rawal
committed
func (p *proposals) pctx(key string) *proposalCtx {
if pctx := p.all[key]; pctx != nil {
return pctx
}
return new(proposalCtx)
Pawan Rawal
committed
func (p *proposals) CtxAndTxn(key string) (context.Context, *posting.Txn) {
pd, has := p.all[key]
if !has {
// See the race condition note in Done.
return context.Background(), new(posting.Txn)
}
Janardhan Reddy
committed
return pd.ctx, pd.txn
Pawan Rawal
committed
func (p *proposals) Done(key string, err error) {
if len(key) == 0 {
return
}
Manish R Jain
committed
p.Lock()
Manish R Jain
committed
if !has {
// If we assert here, there would be a race condition between a context
// timing out, and a proposal getting applied immediately after. That
// would cause assert to fail. So, don't assert.
Manish R Jain
committed
return
}
Manish R Jain
committed
}
Manish R Jain
committed
type node struct {
*conn.Node
// Fields which are never changed after init.
applyCh chan raftpb.Entry
ctx context.Context
stop chan struct{} // to send the stop signal to Run
done chan struct{} // to check whether node is running or not
gid uint32
props proposals
canCampaign bool
elog trace.EventLog
}
func (n *node) WaitForMinProposal(ctx context.Context, read *api.LinRead) error {
if read == nil {
return nil
}
// TODO: Now that we apply txn updates via Raft, waiting based on Txn timestamps is sufficient.
// It ensures that we have seen all applied mutations before a txn commit proposal is applied.
// if read.Sequencing == api.LinRead_SERVER_SIDE {
// return n.WaitLinearizableRead(ctx)
// }
if read.Ids == nil {
return nil
}
gid := n.RaftContext.Group
min := read.Ids[gid]
return n.Applied.WaitForMark(ctx, min)
func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) *node {
x.Printf("Node ID: %v with GroupID: %v\n", id, gid)
Manish R Jain
committed
Manish R Jain
committed
Addr: myAddr,
Group: gid,
Id: id,
}
props := proposals{
all: make(map[string]*proposalCtx),
}
Manish R Jain
committed
n := &node{
Node: m,
ctx: context.Background(),
gid: gid,
// processConfChange etc are not throttled so some extra delta, so that we don't
// block tick when applyCh is full
applyCh: make(chan raftpb.Entry, Config.NumPendingProposals+1000),
props: props,
stop: make(chan struct{}),
done: make(chan struct{}),
elog: trace.NewEventLog("Dgraph", "ApplyCh"),
Manish R Jain
committed
}
return n
}
Manish R Jain
committed
type header struct {
proposalId uint32
msgId uint16
}
func (h *header) Length() int {
return 6 // 4 bytes for proposalId, 2 bytes for msgId.
}
func (h *header) Encode() []byte {
result := make([]byte, h.Length())
binary.LittleEndian.PutUint32(result[0:4], h.proposalId)
binary.LittleEndian.PutUint16(result[4:6], h.msgId)
return result
}
func (h *header) Decode(in []byte) {
h.proposalId = binary.LittleEndian.Uint32(in[0:4])
h.msgId = binary.LittleEndian.Uint16(in[4:6])
}
var errInternalRetry = errors.New("Retry Raft proposal internally")
Pawan Rawal
committed
// proposeAndWait sends a proposal through RAFT. It waits on a channel for the proposal
// to be applied(written to WAL) to all the nodes in the group.
func (n *node) proposeAndWait(ctx context.Context, proposal *intern.Proposal) error {
return x.Errorf("Raft isn't initialized yet")
// TODO: Should be based on number of edges (amount of work)
x.PendingProposals.Add(1)
defer func() { <-pendingProposals; x.PendingProposals.Add(-1) }()
if ctx.Err() != nil {
return ctx.Err()
}
// Do a type check here if schema is present
// In very rare cases invalid entries might pass through raft, which would
// be persisted, we do best effort schema check while writing
if proposal.Mutations != nil {
for _, edge := range proposal.Mutations.Edges {
Janardhan Reddy
committed
if tablet := groups().Tablet(edge.Attr); tablet != nil && tablet.ReadOnly {
return errPredicateMoving
Pawan Rawal
committed
} else if tablet.GroupId != groups().groupId() {
// Tablet can move by the time request reaches here.
return errUnservedTablet
Janardhan Reddy
committed
}
su, ok := schema.State().Get(edge.Attr)
if !ok {
} else if err := ValidateAndConvert(edge, &su); err != nil {
for _, schema := range proposal.Mutations.Schema {
Janardhan Reddy
committed
if tablet := groups().Tablet(schema.Predicate); tablet != nil && tablet.ReadOnly {
return errPredicateMoving
}
if err := checkSchema(schema); err != nil {
propose := func() error {
cctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
Manish R Jain
committed
che := make(chan error, 1)
pctx := &proposalCtx{
ch: che,
ctx: cctx,
}
key := uniqueKey()
x.AssertTruef(n.props.Store(key, pctx), "Found existing proposal with key: [%v]", key)
defer n.props.Delete(key) // Ensure that it gets deleted on return.
proposal.Key = key
Manish R Jain
committed
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Proposing data with key: %s", key)
}
Manish R Jain
committed
data, err := proposal.Marshal()
if err != nil {
return err
}
Manish R Jain
committed
if err = n.Raft().Propose(cctx, data); err != nil {
return x.Wrapf(err, "While proposing")
}
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Waiting for the proposal.")
}
select {
case err = <-che:
// We arrived here by a call to n.props.Done().
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Done with error: %v", err)
}
return err
case <-ctx.Done():
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("External context timed out with error: %v.", ctx.Err())
return ctx.Err()
case <-cctx.Done():
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Internal context timed out with error: %v. Retrying...", cctx.Err())
}
return errInternalRetry
Manish R Jain
committed
}
// Some proposals can be stuck if leader change happens. For e.g. MsgProp message from follower
// to leader can be dropped/end up appearing with empty Data in CommittedEntries.
// Having a timeout here prevents the mutation being stuck forever in case they don't have a
// timeout. We should always try with a timeout and optionally retry.
err := errInternalRetry
for err == errInternalRetry {
err = propose()
}
Manish R Jain
committed
}
func (n *node) processEdge(ridx uint64, pkey string, edge *intern.DirectedEdge) error {
ctx, txn := n.props.CtxAndTxn(pkey)
if txn.ShouldAbort() {
return dy.ErrConflict
rv := x.RaftValue{Group: n.gid, Index: ridx}
ctx = context.WithValue(ctx, "raft", rv)
// Index updates would be wrong if we don't wait.
// Say we do <0x1> <name> "janardhan", <0x1> <name> "pawan",
// while applying the second mutation we check the old value
// of name and delete it from "janardhan"'s index. If we don't
// wait for commit information then mutation won't see the value
// We used to Oracle().WaitForTs here.
// TODO: Need to ensure that keys which hold values can only keep one pending txn at a
// time. Otherwise, their index generated would be wrong.
Janardhan Reddy
committed
if err := runMutation(ctx, edge, txn); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("process mutation: %v", err)
return err
}
return nil
}
func (n *node) processSchemaMutations(pkey string, index uint64,
startTs uint64, s *intern.SchemaUpdate) error {
ctx, _ := n.props.CtxAndTxn(pkey)
rv := x.RaftValue{Group: n.gid, Index: index}
Janardhan Reddy
committed
ctx = context.WithValue(ctx, "raft", rv)
if err := runSchemaMutation(ctx, s, startTs); err != nil {
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf(err.Error())
}
return err
}
return nil
}
func (n *node) applyConfChange(e raftpb.Entry) {
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
if cc.Type == raftpb.ConfChangeRemoveNode {
n.DeletePeer(cc.NodeID)
} else if len(cc.Context) > 0 {
x.Check(rc.Unmarshal(cc.Context))
n.Connect(rc.Id, rc.Addr)
}
cs := n.Raft().ApplyConfChange(cc)
n.SetConfState(cs)
n.DoneConfChange(cc.ID, nil)
func waitForConflictResolution(attr string) error {
for i := 0; i < 10; i++ {
tctxs := posting.Txns().Iterate(func(key []byte) bool {
pk := x.Parse(key)
return pk.Attr == attr
})
if len(tctxs) == 0 {
return nil
tryAbortTransactions(tctxs)
}
return errors.New("Unable to abort transactions")
}
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
func updateTxns(raftIndex uint64, startTs uint64) *posting.Txn {
txn := &posting.Txn{
StartTs: startTs,
Indices: []uint64{raftIndex},
}
return posting.Txns().PutOrMergeIndex(txn)
}
// We don't support schema mutations across nodes in a transaction.
// Wait for all transactions to either abort or complete and all write transactions
// involving the predicate are aborted until schema mutations are done.
func (n *node) applyMutations(proposal *intern.Proposal, index uint64) error {
if proposal.Mutations.DropAll {
// Ensures nothing get written to disk due to commit proposals.
posting.Txns().Reset()
schema.State().DeleteAll()
err := posting.DeleteAll()
return err
}
if proposal.Mutations.StartTs == 0 {
return errors.New("StartTs must be provided.")
}
startTs := proposal.Mutations.StartTs
if len(proposal.Mutations.Schema) > 0 {
for _, supdate := range proposal.Mutations.Schema {
// This is neceassry to ensure that there is no race between when we start reading
// from badger and new mutation getting commited via raft and getting applied.
// Before Moving the predicate we would flush all and wait for watermark to catch up
// but there might be some proposals which got proposed but not comitted yet.
// It's ok to reject the proposal here and same would happen on all nodes because we
// would have proposed membershipstate, and all nodes would have the proposed state
// or some state after that before reaching here.
if tablet := groups().Tablet(supdate.Predicate); tablet != nil && tablet.ReadOnly {
return errPredicateMoving
}
if err := waitForConflictResolution(supdate.Predicate); err != nil {
return err
}
if err := n.processSchemaMutations(proposal.Key, index, startTs, supdate); err != nil {
return err
}
Manish R Jain
committed
}
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
// Scheduler tracks tasks at subject, predicate level, so doing
// schema stuff here simplies the design and we needn't worry about
// serializing the mutations per predicate or schema mutations
// We derive the schema here if it's not present
// Since raft committed logs are serialized, we can derive
// schema here without any locking
// stores a map of predicate and type of first mutation for each predicate
schemaMap := make(map[string]types.TypeID)
for _, edge := range proposal.Mutations.Edges {
if tablet := groups().Tablet(edge.Attr); tablet != nil && tablet.ReadOnly {
updateTxns(index, proposal.Mutations.StartTs)
return errPredicateMoving
}
if edge.Entity == 0 && bytes.Equal(edge.Value, []byte(x.Star)) {
// We should only have one edge drop in one mutation call.
ctx, _ := n.props.CtxAndTxn(proposal.Key)
if err := waitForConflictResolution(edge.Attr); err != nil {
return err
}
return posting.DeletePredicate(ctx, edge.Attr)
}
// Dont derive schema when doing deletion.
if edge.Op == intern.DirectedEdge_DEL {
continue
}
if _, ok := schemaMap[edge.Attr]; !ok {
schemaMap[edge.Attr] = posting.TypeID(edge)
}
}
total := len(proposal.Mutations.Edges)
x.ActiveMutations.Add(int64(total))
for attr, storageType := range schemaMap {
if _, err := schema.State().TypeOf(attr); err != nil {
// Schema doesn't exist
// Since committed entries are serialized, updateSchemaIfMissing is not
// needed, In future if schema needs to be changed, it would flow through
// raft so there won't be race conditions between read and update schema
updateSchemaType(attr, storageType, index)
pctx := n.props.pctx(proposal.Key)
pctx.txn = updateTxns(index, m.StartTs)
for _, edge := range m.Edges {
err := posting.ErrRetry
for err == posting.ErrRetry {
err = n.processEdge(index, proposal.Key, edge)
}
if err != nil {
return err
Pawan Rawal
committed
}
x.ActiveMutations.Add(-1)
}
return nil
}
Pawan Rawal
committed
func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error {
if proposal.DeprecatedId != 0 {
proposal.Key = fmt.Sprint(proposal.DeprecatedId)
}
// One final applied and synced watermark would be emitted when proposal ctx ref count
// becomes zero.
pctx := n.props.pctx(proposal.Key)
if pctx == nil {
// This is during replay of logs after restart or on a replica.
pctx = &proposalCtx{
ch: make(chan error, 1),
ctx: n.ctx,
// We assert here to make sure that we do add the proposal to the map.
x.AssertTruef(n.props.Store(proposal.Key, pctx),
"Found existing proposal with key: [%v]", proposal.Key)
}
pctx.index = index
if proposal.Mutations != nil {
// syncmarks for this shouldn't be marked done until it's comitted.
n.elog.Printf("Applying mutations for key: %s", proposal.Key)
return n.applyMutations(proposal, index)
}
if len(proposal.Kv) > 0 {
ctx, _ := n.props.CtxAndTxn(proposal.Key)
return populateKeyValues(ctx, proposal.Kv)
} else if proposal.State != nil {
n.elog.Printf("Applying state for key: %s", proposal.Key)
// This state needn't be snapshotted in this group, on restart we would fetch
// a state which is latest or equal to this.
groups().applyState(proposal.State)
return nil
} else if len(proposal.CleanPredicate) > 0 {
ctx, _ := n.props.CtxAndTxn(proposal.Key)
return posting.DeletePredicate(ctx, proposal.CleanPredicate)
} else if proposal.DeprecatedTxnContext != nil {
n.elog.Printf("Applying txncontext for key: %s", proposal.Key)
delta := &intern.OracleDelta{}
tctx := proposal.DeprecatedTxnContext
if tctx.CommitTs == 0 {
delta.Aborts = append(delta.Aborts, tctx.StartTs)
} else {
delta.Commits = make(map[uint64]uint64)
delta.Commits[tctx.StartTs] = tctx.CommitTs
}
return n.commitOrAbort(proposal.Key, delta)
} else if proposal.Delta != nil {
n.elog.Printf("Applying Oracle Delta for key: %s", proposal.Key)
return n.commitOrAbort(proposal.Key, proposal.Delta)
} else if proposal.Snapshot != nil {
snap := proposal.Snapshot
n.elog.Printf("Creating snapshot: %+v", snap)
x.Printf("Creating snapshot at index: %d. MinPendingStartTs: %d.\n",
snap.Index, snap.MinPendingStartTs)
data, err := snap.Marshal()
x.Check(err)
return n.Store.CreateSnapshot(snap.Index, n.ConfState(), data)
} else {
x.Fatalf("Unknown proposal")
}
return nil
}
func (n *node) processApplyCh() {
for e := range n.applyCh {
proposal := &intern.Proposal{}
if err := proposal.Unmarshal(e.Data); err != nil {
x.Fatalf("Unable to unmarshal proposal: %v %q\n", err, e.Data)
err := n.applyCommitted(proposal, e.Index)
n.elog.Printf("Applied proposal with key: %s, index: %d. Err: %v", proposal.Key, e.Index, err)
n.props.Done(proposal.Key, err)
n.Applied.Done(e.Index)
Manish R Jain
committed
}
}
func (n *node) commitOrAbort(pkey string, delta *intern.OracleDelta) error {
ctx, _ := n.props.CtxAndTxn(pkey)
applyTxnStatus := func(startTs, commitTs uint64) {
var err error
for i := 0; i < 3; i++ {
err = commitOrAbort(ctx, startTs, commitTs)
if err == nil || err == posting.ErrInvalidTxn {
break
}
x.Printf("Error while applying txn status (%d -> %d): %v", startTs, commitTs, err)
}
// TODO: Even after multiple tries, if we're unable to apply the status of a transaction,
// what should we do? Maybe do a printf, and let them know that there might be a disk issue.
posting.Txns().Done(startTs)
posting.Oracle().Done(startTs)
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Status of commitOrAbort startTs %d: %v\n", startTs, err)
}
for startTs, commitTs := range delta.GetCommits() {
applyTxnStatus(startTs, commitTs)
for _, startTs := range delta.GetAborts() {
applyTxnStatus(startTs, 0)
}
// TODO: Use MaxPending to track the txn watermark. That's the only thing we need really.
// delta.GetMaxPending
posting.Oracle().ProcessOracleDelta(delta)
return nil
func (n *node) applyAllMarks(ctx context.Context) {
// Get index of last committed.
Pawan Rawal
committed
lastIndex := n.Applied.LastIndex()
n.Applied.WaitForMark(ctx, lastIndex)
}
func (n *node) leaderBlocking() (*conn.Pool, error) {
Pawan Rawal
committed
pool := groups().Leader(groups().groupId())
if pool == nil {
// Functions like retrieveSnapshot and joinPeers are blocking at initial start and
// leader election for a group might not have happened when it is called. If we can't
// find a leader, get latest state from
Pawan Rawal
committed
// Zero.
if err := UpdateMembershipState(context.Background()); err != nil {
return nil, fmt.Errorf("Error while trying to update membership state: %+v", err)
Pawan Rawal
committed
}
return nil, fmt.Errorf("Unable to reach leader in group %d", n.gid)
}
return pool, nil
}
func (n *node) retrieveSnapshot() error {
pool, err := n.leaderBlocking()
if err != nil {
return err
// Wait for watermarks to sync since populateShard writes directly to db, otherwise
// the values might get overwritten
// Safe to keep this line
n.applyAllMarks(n.ctx)
Pawan Rawal
committed
// Need to clear pl's stored in memory for the case when retrieving snapshot with
// index greater than this node's last index
// Should invalidate/remove pl's to this group only ideally
//
// We can safely evict posting lists from memory. Because, all the updates corresponding to txn
// commits up until then have already been written to pstore. And the way we take snapshots, we
// keep all the pre-writes for a pending transaction, so they will come back to memory, as Raft
// logs are replayed.
Pawan Rawal
committed
if _, err := n.populateShard(pstore, pool); err != nil {
return fmt.Errorf("Cannot retrieve snapshot from peer, error: %v\n", err)
// Populate shard stores the streamed data directly into db, so we need to refresh
// schema for current group id
if err := schema.LoadFromDb(); err != nil {
return fmt.Errorf("Error while initilizating schema: %+v\n", err)
}
Janardhan Reddy
committed
groups().triggerMembershipSync()
Manish R Jain
committed
}
func (n *node) Run() {
// See also our configuration of HeartbeatTick and ElectionTick.
ticker := time.NewTicker(20 * time.Millisecond)
slowTicker := time.NewTicker(time.Minute)
defer slowTicker.Stop()
Manish R Jain
committed
for {
select {
n.elog.Printf("Size of applyCh: %d", len(n.applyCh))
if leader {
// We use disk based storage for Raft. So, we're not too concerned about
// snapshotting. We just need to do enough, so that we don't have a huge backlog of
// entries to process on a restart.
if err := n.calculateSnapshot(1000); err != nil {
x.Errorf("While taking snapshot: %v\n", err)
}
go n.abortOldTransactions()
}
Manish R Jain
committed
case rd := <-n.Raft().Ready():
Manish R Jain
committed
var tr trace.Trace
if len(rd.Entries) > 0 || !raft.IsEmptySnap(rd.Snapshot) || !raft.IsEmptyHardState(rd.HardState) {
// Optionally, trace this run.
tr = trace.New("Dgraph", "RunLoop")
}
Janardhan Reddy
committed
groups().triggerMembershipSync()
leader = rd.RaftState == raft.StateLeader
}
if leader {
// Leader can send messages in parallel with writing to disk.
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
n.Send(msg)
Manish R Jain
committed
if tr != nil {
tr.LazyPrintf("Handled ReadStates and SoftState.")
}
Manish R Jain
committed
// Store the hardstate and entries. Note that these are not CommittedEntries.
n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
Manish R Jain
committed
if tr != nil {
Manish R Jain
committed
tr.LazyPrintf("Saved %d entries. Snapshot, HardState empty? (%v, %v)",
len(rd.Entries),
raft.IsEmptySnap(rd.Snapshot),
raft.IsEmptyHardState(rd.HardState))
Manish R Jain
committed
}
Manish R Jain
committed
if !raft.IsEmptySnap(rd.Snapshot) {
// We don't send snapshots to other nodes. But, if we get one, that means
// either the leader is trying to bring us up to state; or this is the
// snapshot that I created. Only the former case should be handled.
var snap intern.Snapshot
x.Check(snap.Unmarshal(rd.Snapshot.Data))
rc := snap.GetContext()
x.AssertTrue(rc.GetGroup() == n.gid)
if rc.Id != n.Id {
// NOTE: Retrieving snapshot here is OK, after storing it above in WAL, because
// rc.Id != n.Id.
x.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id)
// It's ok to block tick while retrieving snapshot, since it's a follower
n.retryUntilSuccess(n.retrieveSnapshot, 100*time.Millisecond)
x.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid)
x.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id)
Manish R Jain
committed
}
Manish R Jain
committed
if tr != nil {
tr.LazyPrintf("Applied or retrieved snapshot.")
Manish R Jain
committed
}
// Now schedule or apply committed entries.
for idx, entry := range rd.CommittedEntries {
// Need applied watermarks for schema mutation also for read linearazibility
// Applied watermarks needs to be emitted as soon as possible sequentially.
// If we emit Mark{4, false} and Mark{4, true} before emitting Mark{3, false}
// then doneUntil would be set as 4 as soon as Mark{4,true} is done and before
// Mark{3, false} is emitted. So it's safer to emit watermarks as soon as
// possible sequentially
n.Applied.Begin(entry.Index)
if entry.Type == raftpb.EntryConfChange {
n.applyConfChange(entry)
// Not present in proposal map.
n.Applied.Done(entry.Index)
groups().triggerMembershipSync()
} else if len(entry.Data) == 0 {
// TODO: Say something. Do something.
tr.LazyPrintf("Found empty data at index: %d", entry.Index)
tr.SetError()
// When applyCh fills up, this would automatically block.
n.applyCh <- entry
}
// Move to debug log later.
// Sometimes after restart there are too many entries to replay, so log so that we
// know Run loop is replaying them.
Manish R Jain
committed
if tr != nil && idx%5000 == 4999 {
tr.LazyPrintf("Handling committed entries. At idx: [%v]\n", idx)
Manish R Jain
committed
if tr != nil {
tr.LazyPrintf("Handled %d committed entries.", len(rd.CommittedEntries))
Manish R Jain
committed
}
if !leader {
// Followers should send messages later.
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
n.Send(msg)
Manish R Jain
committed
if tr != nil {
tr.LazyPrintf("Follower queued messages.")
}
Manish R Jain
committed
if firstRun && n.canCampaign {
go n.Raft().Campaign(n.ctx)
Manish R Jain
committed
if tr != nil {
tr.LazyPrintf("Advanced Raft. Done.")
tr.Finish()
Manish R Jain
committed
}
Pawan Rawal
committed
if peerId, has := groups().MyPeer(); has && n.AmLeader() {
n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId)
go func() {
select {
case <-n.ctx.Done(): // time out
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("context timed out while transfering leadership")
}
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Timed out transfering leadership")
}
}
n.Raft().Stop()
close(n.done)
}()
} else {
n.Raft().Stop()
close(n.done)
}
case <-n.done:
Manish R Jain
committed
return
}
}
}
func (n *node) Stop() {
select {
case n.stop <- struct{}{}:
case <-n.done:
// already stopped.
return
}
<-n.done // wait for Run to respond.
}
// abortOldTransactions would find txns which have done pre-writes, but have been pending for a
// while. The time that is used is based on the last pre-write seen, so if a txn is doing a
// pre-write multiple times, we'll pick the timestamp of the last pre-write. Thus, this function
// would only act on the txns which have not been active in the last N minutes, and send them for
// abort. Note that only the leader runs this function.
// NOTE: We might need to get the results of TryAbort and propose them. But, it's unclear if we need
// to, because Zero should stream out the aborts anyway.
func (n *node) abortOldTransactions() {
pl := groups().Leader(0)
if pl == nil {
return
}
zc := intern.NewZeroClient(pl.Get())
// Aborts if not already committed.
startTimestamps := posting.Oracle().TxnOlderThan(5 * time.Minute)
if len(startTimestamps) == 0 {
return
}
req := &intern.TxnTimestamps{Ts: startTimestamps}
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := zc.TryAbort(ctx, req)
x.Printf("Aborted txns with start ts: %v. Error: %v\n", startTimestamps, err)
}
// calculateSnapshot would calculate a snapshot index, considering these factors:
// - We still keep at least keepN number of Raft entries. If we cut too short,
// then the chances that a crashed follower needs to retrieve the entire state
// from leader increases. So, we keep a buffer to allow a follower to catch up.
// - We can discard at least half of keepN number of entries.
// - We are not overshooting the max applied entry. That is, we're not removing
// Raft entries before they get applied.
// - We are considering the minimum start ts that has yet to be committed or
// aborted. This way, we still keep all the mutations corresponding to this
// start ts in the Raft logs. This is important, because we don't persist
// pre-writes to disk in pstore.
// - Finally, this function would propose this snapshot index, so the entire
// group can apply it to their Raft stores.
func (n *node) calculateSnapshot(keepN int) error {
tr := trace.New("Dgraph.Internal", "Propose.Snapshot")
defer tr.Finish()
// Each member of the Raft group is taking snapshots independently, as mentioned in Section 7 of
// Raft paper. We want to avoid taking snapshots too close to the LastIndex, so that in case the
// leader changes, and the followers haven't yet caught up to this index, they would need to
// retrieve the entire state (snapshot) of the DB. So, we should always wait to accumulate skip
// entries before we start taking a snapshot.
count, err := n.Store.NumEntries()
if err != nil {
tr.LazyPrintf("Error: %v", err)
tr.SetError()
return err
}
tr.LazyPrintf("Found Raft entries: %d", count)
if count < 2*keepN {
// We wait to build up at least 2*keepN entries, and then discard keepN entries.
tr.LazyPrintf("Skipping due to insufficient entries")
return nil
}
discard := count - keepN
first, err := n.Store.FirstIndex()
if err != nil {
tr.LazyPrintf("Error: %v", err)
tr.SetError()
return err
}
tr.LazyPrintf("First index: %d", first)
last := first + uint64(discard)
if last > n.Applied.DoneUntil() {
tr.LazyPrintf("Skipping because last index: %d > applied", last)
return nil
}
entries, err := n.Store.Entries(first, last, math.MaxUint64)
if err != nil {
tr.LazyPrintf("Error: %v", err)
tr.SetError()
return err
}
// We find the minimum start ts for which a decision to commit or abort is still pending. We
// should not discard mutations corresponding to this start ts, because we don't persist
// mutations until they are committed.
minPending := posting.Oracle().MinPendingStartTs()
tr.LazyPrintf("Found min pending start ts: %d", minPending)
var snapshotIdx uint64
for _, entry := range entries {
if entry.Type != raftpb.EntryNormal {
continue
}
var proposal intern.Proposal
if err := proposal.Unmarshal(entry.Data); err != nil {
tr.LazyPrintf("Error: %v", err)
tr.SetError()
return err
}
mu := proposal.Mutations
if mu != nil && mu.StartTs >= minPending {
break
}
snapshotIdx = entry.Index
}
tr.LazyPrintf("Got snapshotIdx: %d. Discarding: %d", snapshotIdx, snapshotIdx-first)
// We should discard at least half of keepN. Otherwise, why bother.
if int(snapshotIdx-first) < int(float64(keepN)*0.5) {
tr.LazyPrintf("Skipping snapshot because insufficient discard entries")
x.Printf("Skipping snapshot. Insufficient discard entries: %d. MinPendingStartTs: %d\n",
snapshotIdx-first, minPending)
return nil
snap := &intern.Snapshot{
Context: n.RaftContext,
Index: snapshotIdx,
MinPendingStartTs: minPending,
proposal := &intern.Proposal{
Snapshot: snap,
}
tr.LazyPrintf("Proposing snapshot: %+v\n", snap)
Pawan Rawal
committed
if err := n.Raft().Propose(context.Background(), data); err != nil {
tr.LazyPrintf("Error: %v", err)
tr.SetError()
return err
}
tr.LazyPrintf("Done best-effort proposing.")
return nil
func (n *node) joinPeers() error {
pl, err := n.leaderBlocking()
if err != nil {
return err
Manish R Jain
committed
c := intern.NewRaftClient(gconn)
x.Printf("Calling JoinCluster via leader: %s", pl.Addr)
if _, err := c.JoinCluster(n.ctx, n.RaftContext); err != nil {
return x.Errorf("Error while joining cluster: %+v\n", err)
}
x.Printf("Done with JoinCluster call\n")
}
// Checks if its a peer from the leader of the group.
func (n *node) isMember() (bool, error) {
pl, err := n.leaderBlocking()
if err != nil {
return false, err
}
gconn := pl.Get()
c := intern.NewRaftClient(gconn)
x.Printf("Calling IsPeer")
pr, err := c.IsPeer(n.ctx, n.RaftContext)
if err != nil {
return false, x.Errorf("Error while joining cluster: %+v\n", err)
}
x.Printf("Done with IsPeer call\n")
return pr.Status, nil
}
func (n *node) retryUntilSuccess(fn func() error, pause time.Duration) {
var err error
for {
if err = fn(); err == nil {
break
}