Newer
Older
/*
*
* This file is available under the Apache License, Version 2.0,
* with the Commons Clause restriction.
*/
package conn
import (
"bytes"
"encoding/binary"
"fmt"
"math/rand"
"sync"
"time"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"golang.org/x/net/context"
)
ErrDuplicateRaftId = x.Errorf("Node is already part of group")
type sendmsg struct {
to uint64
data []byte
}
type Node struct {
x.SafeMutex
// SafeMutex is for fields which can be changed after init.
_confState *raftpb.ConfState
_raft raft.Node
// Fields which are never changed after init.
Cfg *raft.Config
MyAddr string
Id uint64
confChanges map[uint64]chan error
messages chan sendmsg
RaftContext *intern.RaftContext
// applied is used to keep track of the applied RAFT proposals.
// The stages are proposed -> committed (accepted by cluster) ->
// applied (to PL) -> synced (to BadgerDB).
Applied x.WaterMark
}
type lockedSource struct {
lk sync.Mutex
src rand.Source
}
func (r *lockedSource) Int63() int64 {
r.lk.Lock()
defer r.lk.Unlock()
return r.src.Int63()
}
func (r *lockedSource) Seed(seed int64) {
r.lk.Lock()
defer r.lk.Unlock()
r.src.Seed(seed)
}
func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node {
n := &Node{
Id: rc.Id,
Store: store,
Cfg: &raft.Config{
ID: rc.Id,
ElectionTick: 100, // 200 ms if we call Tick() every 20 ms.
HeartbeatTick: 1, // 20 ms if we call Tick() every 20 ms.
Storage: store,
MaxSizePerMsg: 256 << 10,
MaxInflightMsgs: 256,
Logger: &raft.DefaultLogger{Logger: x.Logger},
// We use lease-based linearizable ReadIndex for performance, at the cost of
// correctness. With it, communication goes follower->leader->follower, instead of
// follower->leader->majority_of_followers->leader->follower. We lose correctness
// because the Raft ticker might not arrive promptly, in which case the leader would
// falsely believe that its lease is still good.
// Remove both of these, to allow default behavior.
// CheckQuorum causes a lot of network disruption. I'm seeing this with v1.0.6.
// CheckQuorum: true,
// ReadOnlyOption: raft.ReadOnlyLeaseBased,
},
// processConfChange etc are not throttled so some extra delta, so that we don't
// block tick when applyCh is full
confChanges: make(map[uint64]chan error),
RaftContext: rc,
messages: make(chan sendmsg, 100),
Applied: x.WaterMark{Name: fmt.Sprintf("Applied watermark")},
Rand: rand.New(&lockedSource{src: rand.NewSource(time.Now().UnixNano())}),
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
}
n.Applied.Init()
// TODO: n_ = n is a hack. We should properly init node, and make it part of the server struct.
// This can happen once we get rid of groups.
n_ = n
return n
}
// SetRaft would set the provided raft.Node to this node.
// It would check fail if the node is already set.
func (n *Node) SetRaft(r raft.Node) {
n.Lock()
defer n.Unlock()
x.AssertTrue(n._raft == nil)
n._raft = r
}
// Raft would return back the raft.Node stored in the node.
func (n *Node) Raft() raft.Node {
n.RLock()
defer n.RUnlock()
return n._raft
}
// SetConfState would store the latest ConfState generated by ApplyConfChange.
func (n *Node) SetConfState(cs *raftpb.ConfState) {
n.Lock()
defer n.Unlock()
x.Printf("Setting conf state to %+v\n", cs)
n._confState = cs
}
func (n *Node) DoneConfChange(id uint64, err error) {
n.Lock()
defer n.Unlock()
ch, has := n.confChanges[id]
if !has {
return
}
delete(n.confChanges, id)
ch <- err
}
func (n *Node) storeConfChange(che chan error) uint64 {
n.Lock()
defer n.Unlock()
id := rand.Uint64()
_, has := n.confChanges[id]
for has {
id = rand.Uint64()
_, has = n.confChanges[id]
}
n.confChanges[id] = che
return id
}
// ConfState would return the latest ConfState stored in node.
func (n *Node) ConfState() *raftpb.ConfState {
n.RLock()
defer n.RUnlock()
return n._confState
}
func (n *Node) Peer(pid uint64) (string, bool) {
n.RLock()
defer n.RUnlock()
addr, ok := n.peers[pid]
return addr, ok
}
// addr must not be empty.
func (n *Node) SetPeer(pid uint64, addr string) {
x.AssertTruef(addr != "", "SetPeer for peer %d has empty addr.", pid)
n.Lock()
defer n.Unlock()
}
func (n *Node) Send(m raftpb.Message) {
Pawan Rawal
committed
x.AssertTruef(n.Id != m.To, "Sending message to itself")
data, err := m.Marshal()
x.Check(err)
// As long as leadership is stable, any attempted Propose() calls should be reflected in the
// next raft.Ready.Messages. Leaders will send MsgApps to the followers; followers will send
// MsgProp to the leader. It is up to the transport layer to get those messages to their
// destination. If a MsgApp gets dropped by the transport layer, it will get retried by raft
// (i.e. it will appear in a future Ready.Messages), but MsgProp will only be sent once. During
// leadership transitions, proposals may get dropped even if the network is reliable.
//
// We can't do a select default here. The messages must be sent to the channel, otherwise we
// should block until the channel can accept these messages. BatchAndSendMessages would take
// care of dropping messages which can't be sent due to network issues to the corresponding
// node. But, we shouldn't take the liberty to do that here. It would take us more time to
// repropose these dropped messages anyway, than to block here a bit waiting for the messages
// channel to clear out.
n.messages <- sendmsg{to: m.To, data: data}
}
func (n *Node) Snapshot() (raftpb.Snapshot, error) {
if n == nil || n.Store == nil {
return raftpb.Snapshot{}, errors.New("Uninitialized node or raft store.")
}
return n.Store.Snapshot()
}
func (n *Node) SaveToStorage(h raftpb.HardState, es []raftpb.Entry, s raftpb.Snapshot) {
x.Check(n.Store.Save(h, es, s))
}
func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
if rerr != nil {
return
}
if !raft.IsEmptySnap(sp) {
x.Printf("Found Snapshot, Metadata: %+v\n", sp.Metadata)
restart = true
idx = sp.Metadata.Index
}
var hd raftpb.HardState
if rerr != nil {
return
}
if !raft.IsEmptyHardState(hd) {
x.Printf("Found hardstate: %+v\n", hd)
restart = true
}
var num int
num, rerr = n.Store.NumEntries()
if rerr != nil {
return
}
x.Printf("Group %d found %d entries\n", n.RaftContext.Group, num)
// We'll always have at least one entry.
if num > 1 {
restart = true
}
return
}
const (
messageBatchSoftLimit = 10000000
)
func (n *Node) BatchAndSendMessages() {
batches := make(map[uint64]*bytes.Buffer)
Pawan Rawal
committed
failedConn := make(map[uint64]bool)
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
for {
totalSize := 0
sm := <-n.messages
slurp_loop:
for {
var buf *bytes.Buffer
if b, ok := batches[sm.to]; !ok {
buf = new(bytes.Buffer)
batches[sm.to] = buf
} else {
buf = b
}
totalSize += 4 + len(sm.data)
x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
x.Check2(buf.Write(sm.data))
if totalSize > messageBatchSoftLimit {
// We limit the batch size, but we aren't pushing back on
// n.messages, because the loop below spawns a goroutine
// to do its dirty work. This is good because right now
// (*node).send fails(!) if the channel is full.
break
}
select {
case sm = <-n.messages:
default:
break slurp_loop
}
}
for to, buf := range batches {
if buf.Len() == 0 {
continue
}
Pawan Rawal
committed
addr, has := n.Peer(to)
pool, err := Get().Get(addr)
if !has || err != nil {
if exists := failedConn[to]; !exists {
// So that we print error only the first time we are not able to connect.
// Otherwise, the log is polluted with multiple errors.
x.Printf("No healthy connection found to node Id: %d addr: [%s], err: %v\n",
to, addr, err)
Pawan Rawal
committed
failedConn[to] = true
}
continue
}
failedConn[to] = false
data := make([]byte, buf.Len())
copy(data, buf.Bytes())
Pawan Rawal
committed
go n.doSendMessage(pool, data)
buf.Reset()
}
}
}
Pawan Rawal
committed
func (n *Node) doSendMessage(pool *Pool, data []byte) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client := pool.Get()
c := intern.NewRaftClient(client)
p := &api.Payload{Data: data}
ch := make(chan error, 1)
go func() {
Pawan Rawal
committed
_, err := c.RaftMessage(ctx, p)
if err != nil {
x.Printf("Error while sending message to node with addr: %s, err: %v\n", pool.Addr, err)
ch <- err
}()
select {
case <-ctx.Done():
return
case <-ch:
// We don't need to do anything if we receive any error while sending message.
// RAFT would automatically retry.
return
}
}
// Connects the node and makes its peerPool refer to the constructed pool and address
// (possibly updating ourselves from the old address.) (Unless pid is ourselves, in which
// case this does nothing.)
func (n *Node) Connect(pid uint64, addr string) {
if pid == n.Id {
return
}
if paddr, ok := n.Peer(pid); ok && paddr == addr {
// Already connected.
return
}
// Here's what we do. Right now peerPool maps peer node id's to addr values. If
// a *pool can be created, good, but if not, we still create a peerPoolEntry with
// a nil *pool.
if addr == n.MyAddr {
// TODO: Note this fact in more general peer health info somehow.
x.Printf("Peer %d claims same host as me\n", pid)
n.SetPeer(pid, addr)
return
}
n.SetPeer(pid, addr)
}
func (n *Node) DeletePeer(pid uint64) {
if pid == n.Id {
return
}
n.Lock()
defer n.Unlock()
delete(n.peers, pid)
}
func (n *Node) AddToCluster(ctx context.Context, pid uint64) error {
addr, ok := n.Peer(pid)
x.AssertTruef(ok, "Unable to find conn pool for peer: %d", pid)
Addr: addr,
Group: n.RaftContext.Group,
Id: pid,
}
rcBytes, err := rc.Marshal()
x.Check(err)
ch := make(chan error, 1)
id := n.storeConfChange(ch)
err = n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{
ID: id,
Type: raftpb.ConfChangeAddNode,
NodeID: pid,
Context: rcBytes,
})
if err != nil {
return err
}
err = <-ch
return err
}
func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
if n.Raft() == nil {
return errNoNode
}
if _, ok := n.Peer(id); !ok && id != n.RaftContext.Id {
return x.Errorf("Node %d not part of group", id)
}
ch := make(chan error, 1)
pid := n.storeConfChange(ch)
err := n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{
ID: pid,
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
})
if err != nil {
return err
}
err = <-ch
return err
// TODO: Get rid of this in the upcoming changes.
var n_ *Node
func (w *RaftServer) GetNode() *Node {
w.nodeLock.RLock()
defer w.nodeLock.RUnlock()
}
type RaftServer struct {
nodeLock sync.RWMutex // protects Node.
}
func (w *RaftServer) IsPeer(ctx context.Context, rc *intern.RaftContext) (*intern.PeerResponse,
error) {
node := w.GetNode()
if node == nil || node.Raft() == nil {
return &intern.PeerResponse{}, errNoNode
}
if node._confState == nil {
return &intern.PeerResponse{}, nil
}
for _, raftIdx := range node._confState.Nodes {
if rc.Id == raftIdx {
return &intern.PeerResponse{Status: true}, nil
}
}
return &intern.PeerResponse{}, nil
}
func (w *RaftServer) JoinCluster(ctx context.Context,
rc *intern.RaftContext) (*api.Payload, error) {
if ctx.Err() != nil {
return &api.Payload{}, ctx.Err()
}
// Commenting out the following checks for now, until we get rid of groups.
// TODO: Uncomment this after groups is removed.
node := w.GetNode()
if node == nil || node.Raft() == nil {
return nil, errNoNode
}
// Check that the new node is from the same group as me.
if rc.Group != node.RaftContext.Group {
return nil, x.Errorf("Raft group mismatch")
}
// Also check that the new node is not me.
if rc.Id == node.RaftContext.Id {
return nil, ErrDuplicateRaftId
}
// Check that the new node is not already part of the group.
if addr, ok := node.peers[rc.Id]; ok && rc.Addr != addr {
Get().Connect(addr)
// There exists a healthy connection to server with same id.
if _, err := Get().Get(addr); err == nil {
return &api.Payload{}, ErrDuplicateRaftId
}
node.Connect(rc.Id, rc.Addr)
c := make(chan error, 1)
go func() { c <- node.AddToCluster(ctx, rc.Id) }()
select {
case <-ctx.Done():
return &api.Payload{}, ctx.Err()
case err := <-c:
}
}
var (
errNoNode = fmt.Errorf("No node has been set up yet")
)
func (w *RaftServer) applyMessage(ctx context.Context, msg raftpb.Message) error {
x.Check(rc.Unmarshal(msg.Context))
node := w.GetNode()
if node == nil || node.Raft() == nil {
return errNoNode
}
if rc.Group != node.RaftContext.Group {
return errNoNode
}
node.Connect(msg.From, rc.Addr)
c := make(chan error, 1)
go func() { c <- node.Raft().Step(ctx, msg) }()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-c:
return err
}
}
func (w *RaftServer) RaftMessage(ctx context.Context,
query *api.Payload) (*api.Payload, error) {
if ctx.Err() != nil {
return &api.Payload{}, ctx.Err()
}
for idx := 0; idx < len(query.Data); {
x.AssertTruef(len(query.Data[idx:]) >= 4,
"Slice left of size: %v. Expected at least 4.", len(query.Data[idx:]))
sz := int(binary.LittleEndian.Uint32(query.Data[idx : idx+4]))
idx += 4
msg := raftpb.Message{}
if idx+sz > len(query.Data) {
return &api.Payload{}, x.Errorf(
"Invalid query. Specified size %v overflows slice [%v,%v)\n",
sz, idx, len(query.Data))
}
if err := msg.Unmarshal(query.Data[idx : idx+sz]); err != nil {
x.Check(err)
}
if err := w.applyMessage(ctx, msg); err != nil {
}
idx += sz
}
// fmt.Printf("Got %d messages\n", count)
}
// Hello rpc call is used to check connection with other workers after worker
// tcp server for this instance starts.
func (w *RaftServer) Echo(ctx context.Context, in *api.Payload) (*api.Payload, error) {
return &api.Payload{Data: in.Data}, nil