Newer
Older
/*
* Copyright (C) 2017 Dgraph Labs, Inc. and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
Manish R Jain
committed
package worker
import (
Manish R Jain
committed
"encoding/binary"
Manish R Jain
committed
"math/rand"
Manish R Jain
committed
"sync"
"time"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"github.com/dgraph-io/badger/y"
"github.com/dgraph-io/dgo/protos/api"
dy "github.com/dgraph-io/dgo/y"
Manish R Jain
committed
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/schema"
Manish R Jain
committed
"github.com/dgraph-io/dgraph/x"
)
type proposalCtx struct {
ch chan error
ctx context.Context
cnt int // used for reference counting
// Since each proposal consists of multiple tasks we need to store
// non-nil error returned by task
err error
Pawan Rawal
committed
index uint64 // RAFT index for the proposal.
Janardhan Reddy
committed
// Used for writing all deltas at end
txn *posting.Txn
Manish R Jain
committed
type proposals struct {
sync.RWMutex
Manish R Jain
committed
}
func (p *proposals) Store(pid uint32, pctx *proposalCtx) bool {
Manish R Jain
committed
p.Lock()
defer p.Unlock()
if _, has := p.ids[pid]; has {
return false
}
Manish R Jain
committed
}
Janardhan Reddy
committed
func (p *proposals) IncRef(pid uint32, count int) {
p.Lock()
defer p.Unlock()
pd, has := p.ids[pid]
x.AssertTrue(has)
pd.cnt += count
func (p *proposals) pctx(pid uint32) *proposalCtx {
p.RLock()
defer p.RUnlock()
return p.ids[pid]
}
Janardhan Reddy
committed
func (p *proposals) CtxAndTxn(pid uint32) (context.Context, *posting.Txn) {
Janardhan Reddy
committed
pd, has := p.ids[pid]
x.AssertTrue(has)
return pd.ctx, pd.txn
Manish R Jain
committed
func (p *proposals) Done(pid uint32, err error) {
p.Lock()
Manish R Jain
committed
if !has {
return
}
x.AssertTrue(pd.cnt > 0 && pd.index != 0)
pd.cnt -= 1
if err != nil {
pd.err = err
}
if pd.cnt > 0 {
return
}
delete(p.ids, pid)
pd.ch <- pd.err
// We emit one pending watermark as soon as we read from rd.committedentries.
// Since the tasks are executed in goroutines we need one guarding watermark which
// is done only when all the pending sync/applied marks have been emitted.
Pawan Rawal
committed
groups().Node.Applied.Done(pd.index)
Manish R Jain
committed
}
func (p *proposals) Has(pid uint32) bool {
p.RLock()
defer p.RUnlock()
_, has := p.ids[pid]
return has
}
Manish R Jain
committed
type node struct {
*conn.Node
// Fields which are never changed after init.
applyCh chan raftpb.Entry
ctx context.Context
stop chan struct{} // to send the stop signal to Run
done chan struct{} // to check whether node is running or not
gid uint32
props proposals
sch *scheduler
Manish R Jain
committed
func newNode(gid uint32, id uint64, myAddr string) *node {
x.Printf("Node ID: %v with GroupID: %v\n", id, gid)
Manish R Jain
committed
Manish R Jain
committed
Addr: myAddr,
Group: gid,
Id: id,
}
m := conn.NewNode(rc)
props := proposals{
ids: make(map[uint32]*proposalCtx),
}
Manish R Jain
committed
n := &node{
Node: m,
ctx: context.Background(),
gid: gid,
// processConfChange etc are not throttled so some extra delta, so that we don't
// block tick when applyCh is full
applyCh: make(chan raftpb.Entry, Config.NumPendingProposals+1000),
props: props,
stop: make(chan struct{}),
done: make(chan struct{}),
sch: new(scheduler),
Manish R Jain
committed
}
Manish R Jain
committed
return n
}
Manish R Jain
committed
type header struct {
proposalId uint32
msgId uint16
}
func (h *header) Length() int {
return 6 // 4 bytes for proposalId, 2 bytes for msgId.
}
func (h *header) Encode() []byte {
result := make([]byte, h.Length())
binary.LittleEndian.PutUint32(result[0:4], h.proposalId)
binary.LittleEndian.PutUint16(result[4:6], h.msgId)
return result
}
func (h *header) Decode(in []byte) {
h.proposalId = binary.LittleEndian.Uint32(in[0:4])
h.msgId = binary.LittleEndian.Uint16(in[4:6])
}
Pawan Rawal
committed
// proposeAndWait sends a proposal through RAFT. It waits on a channel for the proposal
// to be applied(written to WAL) to all the nodes in the group.
func (n *node) proposeAndWait(ctx context.Context, proposal *intern.Proposal) error {
return x.Errorf("Raft isn't initialized yet")
// TODO: Should be based on number of edges (amount of work)
x.PendingProposals.Add(1)
defer func() { <-pendingProposals; x.PendingProposals.Add(-1) }()
if ctx.Err() != nil {
return ctx.Err()
}
// Do a type check here if schema is present
// In very rare cases invalid entries might pass through raft, which would
// be persisted, we do best effort schema check while writing
if proposal.Mutations != nil {
for _, edge := range proposal.Mutations.Edges {
Janardhan Reddy
committed
if tablet := groups().Tablet(edge.Attr); tablet != nil && tablet.ReadOnly {
return errPredicateMoving
Pawan Rawal
committed
} else if tablet.GroupId != groups().groupId() {
// Tablet can move by the time request reaches here.
return errUnservedTablet
Janardhan Reddy
committed
}
if typ, err := schema.State().TypeOf(edge.Attr); err != nil {
continue
} else if err := ValidateAndConvert(edge, typ); err != nil {
for _, schema := range proposal.Mutations.Schema {
Janardhan Reddy
committed
if tablet := groups().Tablet(schema.Predicate); tablet != nil && tablet.ReadOnly {
return errPredicateMoving
}
if err := checkSchema(schema); err != nil {
pctx := &proposalCtx{
ch: che,
ctx: ctx,
Manish R Jain
committed
Manish R Jain
committed
if err != nil {
return err
}
Manish R Jain
committed
// We don't timeout on a mutation which has already been proposed.
if err = n.Raft().Propose(ctx, slice[:upto]); err != nil {
Manish R Jain
committed
return x.Wrapf(err, "While proposing")
}
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Waiting for the proposal.")
err = <-che
if err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Raft Propose error: %v", err)
Manish R Jain
committed
}
Manish R Jain
committed
}
func (n *node) processMutation(task *task) error {
pid := task.pid
ridx := task.rid
edge := task.edge
Janardhan Reddy
committed
ctx, txn := n.props.CtxAndTxn(pid)
if txn.ShouldAbort() {
return dy.ErrConflict
rv := x.RaftValue{Group: n.gid, Index: ridx}
ctx = context.WithValue(ctx, "raft", rv)
Janardhan Reddy
committed
if err := runMutation(ctx, edge, txn); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("process mutation: %v", err)
return err
}
return nil
}
func (n *node) processSchemaMutations(pid uint32, index uint64,
startTs uint64, s *intern.SchemaUpdate) error {
ctx, _ := n.props.CtxAndTxn(pid)
rv := x.RaftValue{Group: n.gid, Index: index}
Janardhan Reddy
committed
ctx = context.WithValue(ctx, "raft", rv)
if err := runSchemaMutation(ctx, s, startTs); err != nil {
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf(err.Error())
}
return err
}
return nil
}
func (n *node) applyConfChange(e raftpb.Entry) {
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
if cc.Type == raftpb.ConfChangeRemoveNode {
n.DeletePeer(cc.NodeID)
} else if len(cc.Context) > 0 {
x.Check(rc.Unmarshal(cc.Context))
n.Connect(rc.Id, rc.Addr)
}
cs := n.Raft().ApplyConfChange(cc)
n.SetConfState(cs)
n.DoneConfChange(cc.ID, nil)
Janardhan Reddy
committed
// Not present in proposal map
n.Applied.Done(e.Index)
groups().triggerMembershipSync()
type KeyValueOrCleanProposal struct {
raftIdx uint64
proposal *intern.Proposal
}
func (n *node) processKeyValueOrCleanProposals(
kvChan chan KeyValueOrCleanProposal) {
// Run KeyValueProposals and CleanPredicate one by one always.
// During predicate move we first clean the predicate and then
// propose key values, we wait for clean predicate to be done before
// we propose key values. But during replay if we run these proposals
// in goroutine then we will have no such guarantees so always run
// them sequentially.
for e := range kvChan {
if len(e.proposal.Kv) > 0 {
n.processKeyValues(e.raftIdx, e.proposal.Id, e.proposal.Kv)
} else if len(e.proposal.CleanPredicate) > 0 {
n.deletePredicate(e.raftIdx, e.proposal.Id, e.proposal.CleanPredicate)
} else {
x.Fatalf("Unknown proposal, %+v\n", e.proposal)
}
}
}
func (n *node) processApplyCh() {
kvChan := make(chan KeyValueOrCleanProposal, 1000)
go n.processKeyValueOrCleanProposals(kvChan)
for e := range n.applyCh {
Janardhan Reddy
committed
// This is not in the proposal map
n.Applied.Done(e.Index)
if e.Type == raftpb.EntryConfChange {
n.applyConfChange(e)
Manish R Jain
committed
}
proposal := &intern.Proposal{}
Pawan Rawal
committed
x.Fatalf("Unable to unmarshal proposal: %v %q\n", err, e.Data)
// One final applied and synced watermark would be emitted when proposal ctx ref count
// becomes zero.
pctx := n.props.pctx(proposal.Id)
if pctx == nil {
// This is during replay of logs after restart
pctx = &proposalCtx{
ch: make(chan error, 1),
ctx: n.ctx,
cnt: 1,
}
pctx.index = e.Index
n.props.Store(proposal.Id, pctx)
posting.TxnMarks().Begin(e.Index)
// syncmarks for this shouldn't be marked done until it's comitted.
Janardhan Reddy
committed
} else if len(proposal.Kv) > 0 {
kvChan <- KeyValueOrCleanProposal{
raftIdx: e.Index,
proposal: proposal,
}
Janardhan Reddy
committed
} else if proposal.State != nil {
// This state needn't be snapshotted in this group, on restart we would fetch
// a state which is latest or equal to this.
groups().applyState(proposal.State)
// When proposal is done it emits done watermarks.
posting.TxnMarks().Done(e.Index)
Janardhan Reddy
committed
n.props.Done(proposal.Id, nil)
kvChan <- KeyValueOrCleanProposal{
raftIdx: e.Index,
proposal: proposal,
}
} else if proposal.TxnContext != nil {
go n.commitOrAbort(e.Index, proposal.Id, proposal.TxnContext)
Manish R Jain
committed
}
close(kvChan)
Manish R Jain
committed
}
func (n *node) commitOrAbort(index uint64, pid uint32, tctx *api.TxnContext) {
_, err := commitOrAbort(ctx, tctx)
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Status of commitOrAbort %+v %v\n", tctx, err)
}
if err == nil {
posting.Txns().Done(tctx.StartTs)
posting.TxnMarks().Done(index)
func (n *node) deletePredicate(index uint64, pid uint32, predicate string) {
ctx = context.WithValue(ctx, "raft", rv)
err := posting.DeletePredicate(ctx, predicate)
posting.TxnMarks().Done(index)
func (n *node) processKeyValues(index uint64, pid uint32, kvs []*intern.KV) error {
Janardhan Reddy
committed
ctx, _ := n.props.CtxAndTxn(pid)
Janardhan Reddy
committed
err := populateKeyValues(ctx, kvs)
posting.TxnMarks().Done(index)
Janardhan Reddy
committed
n.props.Done(pid, err)
return nil
}
func (n *node) applyAllMarks(ctx context.Context) {
// Get index of last committed.
Pawan Rawal
committed
lastIndex := n.Applied.LastIndex()
n.Applied.WaitForMark(ctx, lastIndex)
}
func (n *node) retrieveSnapshot() error {
Pawan Rawal
committed
pool := groups().Leader(groups().groupId())
if pool == nil {
Pawan Rawal
committed
// retrieveSnapshot is blocking at initial start and leader election for a group might
// not have happened when it is called. If we can't find a leader, get latest state from
// Zero.
if err := UpdateMembershipState(context.Background()); err != nil {
return fmt.Errorf("Error while trying to update membership state: %+v", err)
}
return fmt.Errorf("Unable to reach leader in group %d", n.gid)
// Wait for watermarks to sync since populateShard writes directly to db, otherwise
// the values might get overwritten
// Safe to keep this line
n.applyAllMarks(n.ctx)
Pawan Rawal
committed
// Need to clear pl's stored in memory for the case when retrieving snapshot with
// index greater than this node's last index
// Should invalidate/remove pl's to this group only ideally
Pawan Rawal
committed
if _, err := n.populateShard(pstore, pool); err != nil {
return fmt.Errorf("Cannot retrieve snapshot from peer, error: %v\n", err)
// Populate shard stores the streamed data directly into db, so we need to refresh
// schema for current group id
if err := schema.LoadFromDb(); err != nil {
return fmt.Errorf("Error while initilizating schema: %+v\n", err)
}
Janardhan Reddy
committed
groups().triggerMembershipSync()
Manish R Jain
committed
}
func (n *node) Run() {
// See also our configuration of HeartbeatTick and ElectionTick.
ticker := time.NewTicker(20 * time.Millisecond)
rcBytes, err := n.RaftContext.Marshal()
// Ensure we don't exit unless any snapshot in progress in done.
closer := y.NewCloser(1)
go n.snapshotPeriodically(closer)
Manish R Jain
committed
for {
select {
Manish R Jain
committed
case rd := <-n.Raft().Ready():
Janardhan Reddy
committed
groups().triggerMembershipSync()
leader = rd.RaftState == raft.StateLeader
}
if leader {
// Leader can send messages in parallel with writing to disk.
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
msg.Context = rcBytes
n.Send(msg)
}
}
// First store the entries, then the hardstate and snapshot.
x.Check(n.Wal.Store(n.gid, rd.HardState, rd.Entries))
// Now store them in the in-memory store.
Pawan Rawal
committed
n.SaveToStorage(rd.HardState, rd.Entries)
Manish R Jain
committed
if !raft.IsEmptySnap(rd.Snapshot) {
// We don't send snapshots to other nodes. But, if we get one, that means
// either the leader is trying to bring us up to state; or this is the
// snapshot that I created. Only the former case should be handled.
x.AssertTrue(rc.Group == n.gid)
if rc.Id != n.Id {
// NOTE: Retrieving snapshot here is OK, after storing it above in WAL, because
// rc.Id != n.Id.
x.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id)
// It's ok to block tick while retrieving snapshot, since it's a follower
n.retryUntilSuccess(n.retrieveSnapshot, 100*time.Millisecond)
x.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid)
x.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id)
Pawan Rawal
committed
x.Check(n.Wal.StoreSnapshot(n.gid, rd.Snapshot))
n.SaveSnapshot(rd.Snapshot)
Manish R Jain
committed
}
Pawan Rawal
committed
lc := len(rd.CommittedEntries)
if lc > 0 {
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Found %d committed entries", len(rd.CommittedEntries))
}
Manish R Jain
committed
}
// Now schedule or apply committed entries.
for idx, entry := range rd.CommittedEntries {
// Need applied watermarks for schema mutation also for read linearazibility
// Applied watermarks needs to be emitted as soon as possible sequentially.
// If we emit Mark{4, false} and Mark{4, true} before emitting Mark{3, false}
// then doneUntil would be set as 4 as soon as Mark{4,true} is done and before
// Mark{3, false} is emitted. So it's safer to emit watermarks as soon as
// possible sequentially
n.Applied.Begin(entry.Index)
if !leader && entry.Type == raftpb.EntryConfChange {
// Config changes in followers must be applied straight away.
n.applyConfChange(entry)
} else {
// TODO: Stop accepting requests when applyCh is full
// Just queue up to be processed. Don't wait on them.
n.applyCh <- entry
}
// Move to debug log later.
// Sometimes after restart there are too many entries to replay, so log so that we
// know Run loop is replaying them.
if lc > 1e5 && idx%5000 == 0 {
x.Printf("In run loop applying committed entries, idx: [%v], pending: [%v]\n",
idx, lc-idx)
}
}
if lc > 1e5 {
x.Println("All committed entries sent to applyCh.")
Manish R Jain
committed
}
if !leader {
// Followers should send messages later.
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
msg.Context = rcBytes
n.Send(msg)
if firstRun && n.canCampaign {
go n.Raft().Campaign(n.ctx)
Manish R Jain
committed
Pawan Rawal
committed
if peerId, has := groups().MyPeer(); has && n.AmLeader() {
n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId)
go func() {
select {
case <-n.ctx.Done(): // time out
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("context timed out while transfering leadership")
}
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Timed out transfering leadership")
}
closer.SignalAndWait()
close(n.done)
}()
} else {
n.Raft().Stop()
closer.SignalAndWait()
Manish R Jain
committed
return
}
}
}
func (n *node) Stop() {
select {
case n.stop <- struct{}{}:
case <-n.done:
// already stopped.
return
}
<-n.done // wait for Run to respond.
}
func (n *node) snapshotPeriodically(closer *y.Closer) {
ticker := time.NewTicker(30 * time.Second)
for {
select {
// Some proposals like predicate move can consume around 32MB per proposal, so keeping
// too many proposals would increase the memory usage so snapshot as soon as
// possible
n.snapshot(10)
Manish R Jain
committed
case <-closer.HasBeenClosed():
closer.Done()
return
}
Manish R Jain
committed
}
}
func (n *node) abortOldTransactions(pending uint64) {
pl := groups().Leader(0)
if pl == nil {
return
}
zc := intern.NewZeroClient(pl.Get())
// Aborts if not already committed.
startTimestamps := posting.Txns().TxnsSinceSnapshot(pending)
req := &intern.TxnTimestamps{Ts: startTimestamps}
zc.TryAbort(context.Background(), req)
}
water := posting.TxnMarks()
existing, err := n.Store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
si := existing.Metadata.Index
if le <= si+skip {
applied := n.Applied.DoneUntil()
// If difference grows above 1.5 * ForceAbortDifference we try to abort old transactions
if applied-le > 1.5*x.ForceAbortDifference && skip != 0 {
// Print warning if difference grows above 3 * x.ForceAbortDifference. Shouldn't ideally
// happen as we abort oldest 20% when it grows above 1.5 times.
if applied-le > 3*x.ForceAbortDifference {
x.Printf("Couldn't take snapshot, txn watermark: [%d], applied watermark: [%d]\n",
le, applied)
}
// Try aborting pending transactions here.
n.abortOldTransactions(applied - le)
}
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Taking snapshot for group: %d at watermark: %d\n", n.gid, snapshotIdx)
}
Pawan Rawal
committed
rc, err := n.RaftContext.Marshal()
s, err := n.Store.CreateSnapshot(snapshotIdx, n.ConfState(), rc)
x.Checkf(n.Store.Compact(snapshotIdx), "While compacting snapshot")
x.Printf("Writing snapshot at index: %d, applied mark: %d\n", snapshotIdx,
n.Applied.DoneUntil())
x.Check(n.Wal.StoreSnapshot(n.gid, s))
func (n *node) joinPeers() error {
// Get leader information for MY group.
pl := groups().Leader(n.gid)
if pl == nil {
Pawan Rawal
committed
if err := UpdateMembershipState(context.Background()); err != nil {
return fmt.Errorf("Error while trying to update membership state: %+v", err)
}
return x.Errorf("Unable to reach leader or any other server in group %d", n.gid)
Manish R Jain
committed
c := intern.NewRaftClient(gconn)
x.Printf("Calling JoinCluster")
ctx, cancel := context.WithTimeout(n.ctx, time.Second)
defer cancel()
Pawan Rawal
committed
// JoinCluster can block indefinitely, raft ignores conf change proposal
// if it has pending configuration.
if _, err := c.JoinCluster(ctx, n.RaftContext); err != nil {
return x.Errorf("Error while joining cluster: %+v\n", err)
}
x.Printf("Done with JoinCluster call\n")
}
func (n *node) retryUntilSuccess(fn func() error, pause time.Duration) {
var err error
for {
if err = fn(); err == nil {
break
}
x.Printf("Error while calling fn: %v. Retrying...\n", err)
time.Sleep(pause)
}
}
// InitAndStartNode gets called after having at least one membership sync with the cluster.
func (n *node) InitAndStartNode(wal *raftwal.Wal) {
idx, restart, err := n.InitFromWal(wal)
posting.TxnMarks().SetDoneUntil(idx)
x.Printf("Restarting node for group: %d\n", n.gid)
sp, err := n.Store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
if !raft.IsEmptySnap(sp) {
members := groups().members(n.gid)
for _, id := range sp.Metadata.ConfState.Nodes {
n.Connect(id, members[id].Addr)
}
n.SetRaft(raft.RestartNode(n.Cfg))
x.Printf("New Node for group: %d\n", n.gid)
Pawan Rawal
committed
if _, hasPeer := groups().MyPeer(); hasPeer {
Pawan Rawal
committed
// Get snapshot before joining peers as it can take time to retrieve it and we dont
// want the quorum to be inactive when it happens.
x.Println("Retrieving snapshot.")
n.retryUntilSuccess(n.retrieveSnapshot, time.Second)
x.Println("Trying to join peers.")
n.retryUntilSuccess(n.joinPeers, time.Second)
n.SetRaft(raft.StartNode(n.Cfg, nil))
peers := []raft.Peer{{ID: n.Id}}
n.SetRaft(raft.StartNode(n.Cfg, peers))
// Trigger election, so this node can become the leader of this single-node cluster.
n.canCampaign = true
Manish R Jain
committed
go n.Run()
go n.BatchAndSendMessages()
Manish R Jain
committed
}
func (n *node) AmLeader() bool {
r := n.Raft()
return r.Status().Lead == r.Status().ID