Newer
Older
/*
* Copyright (C) 2017 Dgraph Labs, Inc. and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
Manish R Jain
committed
package worker
import (
Manish R Jain
committed
"encoding/binary"
Manish R Jain
committed
"fmt"
"log"
Manish R Jain
committed
"math/rand"
Manish R Jain
committed
"sync"
"time"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"golang.org/x/net/context"
"github.com/dgraph-io/dgraph/protos/taskp"
"github.com/dgraph-io/dgraph/protos/workerp"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
Manish R Jain
committed
"github.com/dgraph-io/dgraph/x"
)
proposalMutation = 0
proposalReindex = 1
ErrorNodeIDExists = "Error Node ID already exists in the cluster"
// peerPool stores the peers per node and the addresses corresponding to them.
// We then use pool() to get an active connection to those addresses.
Manish R Jain
committed
type peerPool struct {
sync.RWMutex
peers map[uint64]string
Manish R Jain
committed
}
func (p *peerPool) Get(id uint64) string {
Manish R Jain
committed
p.RLock()
defer p.RUnlock()
return p.peers[id]
}
func (p *peerPool) Set(id uint64, addr string) {
Manish R Jain
committed
p.Lock()
defer p.Unlock()
p.peers[id] = addr
Manish R Jain
committed
}
type proposals struct {
sync.RWMutex
ids map[uint32]chan error
}
func (p *proposals) Store(pid uint32, ch chan error) {
p.Lock()
defer p.Unlock()
_, has := p.ids[pid]
x.AssertTruef(!has, "Same proposal is being stored again.")
Manish R Jain
committed
p.ids[pid] = ch
}
func (p *proposals) Done(pid uint32, err error) {
var ch chan error
p.Lock()
ch, has := p.ids[pid]
if has {
delete(p.ids, pid)
}
p.Unlock()
if !has {
return
}
ch <- err
}
type sendmsg struct {
to uint64
data []byte
}
Manish R Jain
committed
type node struct {
x.SafeMutex
// SafeMutex is for fields which can be changed after init.
_confState *raftpb.ConfState
_raft raft.Node
// Fields which are never changed after init.
cfg *raft.Config
ctx context.Context
stop chan struct{} // to send the stop signal to Run
done chan struct{} // to check whether node is running or not
id uint64
messages chan sendmsg
peers peerPool
props proposals
raftContext *taskp.RaftContext
store *raft.MemoryStorage
// applied is used to keep track of the applied RAFT proposals.
// The stages are proposed -> committed (accepted by cluster) ->
// applied (to PL) -> synced (to RocksDB).
applied x.WaterMark
Manish R Jain
committed
}
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// SetRaft would set the provided raft.Node to this node.
// It would check fail if the node is already set.
func (n *node) SetRaft(r raft.Node) {
n.Lock()
defer n.Unlock()
x.AssertTrue(n._raft == nil)
n._raft = r
}
// Raft would return back the raft.Node stored in the node.
func (n *node) Raft() raft.Node {
n.RLock()
defer n.RUnlock()
return n._raft
}
// SetConfState would store the latest ConfState generated by ApplyConfChange.
func (n *node) SetConfState(cs *raftpb.ConfState) {
n.Lock()
defer n.Unlock()
n._confState = cs
}
// ConfState would return the latest ConfState stored in node.
func (n *node) ConfState() *raftpb.ConfState {
n.RLock()
defer n.RUnlock()
return n._confState
}
Manish R Jain
committed
func newNode(gid uint32, id uint64, myAddr string) *node {
fmt.Printf("Node with GroupID: %v, ID: %v\n", gid, id)
Manish R Jain
committed
peers := peerPool{
peers: make(map[uint64]string),
}
props := proposals{
ids: make(map[uint32]chan error),
}
store := raft.NewMemoryStorage()
Manish R Jain
committed
Addr: myAddr,
Group: gid,
Id: id,
}
n := &node{
ctx: context.Background(),
id: id,
gid: gid,
store: store,
cfg: &raft.Config{
ID: id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: store,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
},
applyCh: make(chan raftpb.Entry, numPendingMutations),
Manish R Jain
committed
peers: peers,
props: props,
raftContext: rc,
messages: make(chan sendmsg, 1000),
stop: make(chan struct{}),
done: make(chan struct{}),
Manish R Jain
committed
}
n.applied = x.WaterMark{Name: fmt.Sprintf("Committed: Group %d", n.gid)}
Manish R Jain
committed
return n
}
Manish R Jain
committed
func (n *node) Connect(pid uint64, addr string) {
for n == nil {
// Sometimes this function causes a panic. My guess is that n is sometimes still uninitialized.
time.Sleep(time.Second)
}
Manish R Jain
committed
if pid == n.id {
return
}
if paddr := n.peers.Get(pid); paddr == addr {
Manish R Jain
committed
return
}
pools().connect(addr)
n.peers.Set(pid, addr)
Manish R Jain
committed
}
func (n *node) AddToCluster(ctx context.Context, pid uint64) error {
addr := n.peers.Get(pid)
x.AssertTruef(len(addr) > 0, "Unable to find conn pool for peer: %d", pid)
Addr: addr,
Group: n.raftContext.Group,
Id: pid,
}
rcBytes, err := rc.Marshal()
x.Check(err)
return n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{
Manish R Jain
committed
ID: pid,
Type: raftpb.ConfChangeAddNode,
NodeID: pid,
Manish R Jain
committed
})
}
Manish R Jain
committed
type header struct {
proposalId uint32
msgId uint16
}
func (h *header) Length() int {
return 6 // 4 bytes for proposalId, 2 bytes for msgId.
}
func (h *header) Encode() []byte {
result := make([]byte, h.Length())
binary.LittleEndian.PutUint32(result[0:4], h.proposalId)
binary.LittleEndian.PutUint16(result[4:6], h.msgId)
return result
}
func (h *header) Decode(in []byte) {
h.proposalId = binary.LittleEndian.Uint32(in[0:4])
h.msgId = binary.LittleEndian.Uint16(in[4:6])
}
Manish R Jain
committed
var slicePool = sync.Pool{
New: func() interface{} {
return make([]byte, 256<<10)
},
}
func (n *node) ProposeAndWait(ctx context.Context, proposal *taskp.Proposal) error {
return x.Errorf("RAFT isn't initialized yet")
}
// Do a type check here if schema is present
// In very rare cases invalid entries might pass through raft, which would
// be persisted, we do best effort schema check while writing
if proposal.Mutations != nil {
for _, edge := range proposal.Mutations.Edges {
if typ, err := schema.State().TypeOf(edge.Attr); err != nil {
continue
} else if err := validateAndConvert(edge, typ); err != nil {
return err
}
}
for _, schema := range proposal.Mutations.Schema {
if err := checkSchema(schema); err != nil {
return err
}
}
Manish R Jain
committed
slice := slicePool.Get().([]byte)
if len(slice) < proposal.Size() {
slice = make([]byte, proposal.Size())
}
defer slicePool.Put(slice)
upto, err := proposal.MarshalTo(slice)
if err != nil {
return err
}
proposalData := make([]byte, upto+1)
// Examining first byte of proposalData will quickly tell us what kind of
// proposal this is.
if proposal.RebuildIndex == nil {
proposalData[0] = proposalMutation
} else {
proposalData[0] = proposalReindex
}
copy(proposalData[1:], slice)
Manish R Jain
committed
che := make(chan error, 1)
Manish R Jain
committed
if err = n.Raft().Propose(ctx, proposalData); err != nil {
Manish R Jain
committed
return x.Wrapf(err, "While proposing")
}
// Wait for the proposal to be committed.
if proposal.Mutations != nil {
x.Trace(ctx, "Waiting for the proposal: mutations.")
} else {
x.Trace(ctx, "Waiting for the proposal: membership update.")
}
Manish R Jain
committed
select {
case err = <-che:
x.TraceError(ctx, err)
return err
case <-ctx.Done():
return ctx.Err()
}
}
Manish R Jain
committed
func (n *node) send(m raftpb.Message) {
x.AssertTruef(n.id != m.To, "Seding message to itself")
if m.Type != raftpb.MsgHeartbeat && m.Type != raftpb.MsgHeartbeatResp {
fmt.Printf("\t\tSENDING: %v %v-->%v\n", m.Type, m.From, m.To)
}
select {
case n.messages <- sendmsg{to: m.To, data: data}:
// pass
default:
x.Fatalf("Unable to push messages to channel in send")
Manish R Jain
committed
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
func (n *node) batchAndSendMessages() {
batches := make(map[uint64]*bytes.Buffer)
for {
select {
case sm := <-n.messages:
var buf *bytes.Buffer
if b, ok := batches[sm.to]; !ok {
buf = new(bytes.Buffer)
batches[sm.to] = buf
} else {
buf = b
}
x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
x.Check2(buf.Write(sm.data))
default:
start := time.Now()
for to, buf := range batches {
if buf.Len() == 0 {
continue
}
data := make([]byte, buf.Len())
copy(data, buf.Bytes())
go n.doSendMessage(to, data)
buf.Reset()
}
// Add a sleep clause to avoid a busy wait loop if there's no input to commitCh.
sleepFor := 10*time.Millisecond - time.Since(start)
time.Sleep(sleepFor)
}
}
}
func (n *node) doSendMessage(to uint64, data []byte) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Manish R Jain
committed
if len(addr) == 0 {
return
}
pool := pools().get(addr)
Manish R Jain
committed
conn, err := pool.Get()
x.Check(err)
defer pool.Put(conn)
Manish R Jain
committed
c := workerp.NewWorkerClient(conn)
p := &workerp.Payload{Data: data}
ch := make(chan error, 1)
go func() {
_, err = c.RaftMessage(ctx, p)
ch <- err
}()
select {
case <-ctx.Done():
return
case <-ch:
// We don't need to do anything if we receive any error while sending message.
// RAFT would automatically retry.
return
}
}
func (n *node) processMutation(e raftpb.Entry, m *taskp.Mutations) error {
// TODO: Need to pass node and entry index.
rv := x.RaftValue{Group: n.gid, Index: e.Index}
ctx := context.WithValue(n.ctx, "raft", rv)
if err := runMutations(ctx, m.Edges); err != nil {
x.TraceError(n.ctx, err)
return err
}
return nil
}
func (n *node) processSchemaMutations(e raftpb.Entry, m *taskp.Mutations) error {
// TODO: Need to pass node and entry index.
rv := x.RaftValue{Group: n.gid, Index: e.Index}
ctx := context.WithValue(n.ctx, "raft", rv)
if err := runSchemaMutations(ctx, m.Schema); err != nil {
x.TraceError(n.ctx, err)
return err
}
return nil
}
func (n *node) processMembership(e raftpb.Entry, mm *taskp.Membership) error {
x.Printf("group: %v Addr: %q leader: %v dead: %v\n",
mm.GroupId, mm.Addr, mm.Leader, mm.AmDead)
groups().applyMembershipUpdate(e.Index, mm)
return nil
}
func (n *node) process(e raftpb.Entry, pending chan struct{}) {
defer func() {
n.applied.Ch <- x.Mark{Index: e.Index, Done: true}
posting.SyncMarkFor(n.gid).Ch <- x.Mark{Index: e.Index, Done: true}
if e.Type != raftpb.EntryNormal {
return
}
Manish R Jain
committed
pending <- struct{}{} // This will block until we can write to it.
x.AssertTrue(len(e.Data) > 0)
x.Checkf(proposal.Unmarshal(e.Data[1:]), "Unable to parse entry: %+v", e)
Manish R Jain
committed
var err error
if proposal.Mutations != nil {
err = n.processMutation(e, proposal.Mutations)
} else if proposal.Membership != nil {
err = n.processMembership(e, proposal.Membership)
Manish R Jain
committed
}
n.props.Done(proposal.Id, err)
<-pending // Release one.
}
Manish R Jain
committed
Manish R Jain
committed
func (n *node) processApplyCh() {
Manish R Jain
committed
pending := make(chan struct{}, numPendingMutations)
for e := range n.applyCh {
mark := x.Mark{Index: e.Index, Done: true}
n.applied.Ch <- mark
posting.SyncMarkFor(n.gid).Ch <- mark
if e.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
if len(cc.Context) > 0 {
x.Check(rc.Unmarshal(cc.Context))
n.Connect(rc.Id, rc.Addr)
}
cs := n.Raft().ApplyConfChange(cc)
n.SetConfState(cs)
n.applied.Ch <- mark
posting.SyncMarkFor(n.gid).Ch <- mark
continue
Manish R Jain
committed
}
// We derive the schema here if it's not present
// Since raft committed logs are serialized, we can derive
// schema here without any locking
x.Checkf(proposal.Unmarshal(e.Data[1:]), "Unable to parse entry: %+v", e)
if e.Type == raftpb.EntryNormal && proposal.Mutations != nil {
// process schema mutations before
if proposal.Mutations.Schema != nil {
// Wait for applied watermark to reach till previous index
// All mutations before this should use old schema and after this
// should use new schema
n.waitForAppliedMark(n.ctx, e.Index-1)
if err := n.processSchemaMutations(e, proposal.Mutations); err != nil {
n.applied.Ch <- mark
posting.SyncMarkFor(n.gid).Ch <- mark
n.props.Done(proposal.Id, err)
}
}
// stores a map of predicate and type of first mutation for each predicate
schemaMap := make(map[string]types.TypeID)
for _, edge := range proposal.Mutations.Edges {
if _, ok := schemaMap[edge.Attr]; !ok {
schemaMap[edge.Attr] = posting.TypeID(edge)
}
}
for attr, storageType := range schemaMap {
if _, err := schema.State().TypeOf(attr); err != nil {
// Schema doesn't exist
// Since committed entries are serialized, updateSchemaIfMissing is not
// needed, In future if schema needs to be changed, it would flow through
// raft so there won't be race conditions between read and update schema
updateSchemaType(attr, storageType, e.Index, n.raftContext.Group)
Manish R Jain
committed
}
}
func (n *node) saveToStorage(s raftpb.Snapshot, h raftpb.HardState,
es []raftpb.Entry) {
if !raft.IsEmptySnap(s) {
le, err := n.store.LastIndex()
if err != nil {
log.Fatalf("While retrieving last index: %v\n", err)
}
if s.Metadata.Index <= le {
return
}
if err := n.store.ApplySnapshot(s); err != nil {
log.Fatalf("Applying snapshot: %v", err)
}
}
if !raft.IsEmptyHardState(h) {
n.store.SetHardState(h)
}
n.store.Append(es)
}
func (n *node) retrieveSnapshot(rc taskp.RaftContext) {
addr := n.peers.Get(rc.Id)
x.AssertTruef(addr != "", "Should have the address for %d", rc.Id)
pool := pools().get(addr)
x.AssertTruef(pool != nil, "Pool shouldn't be nil for address: %v for id: %v", addr, rc.Id)
// Get index of last committed.
lastIndex, err := n.store.LastIndex()
x.Checkf(err, "Error while getting last index")
// Wait for watermarks to sync since populateShard writes directly to db, otherwise
// the values might get overwritten
// Safe to keep this line
// Need to clear pl's stored in memory for the case when retrieving snapshot with
// index greater than this node's last index
// Should invalidate/remove pl's to this group only ideally
Janardhan Reddy
committed
posting.EvictGroup(n.gid)
// Populate shard stores the streamed data directly into db, so we need to refresh
// schema for current group id
x.Checkf(schema.Refresh(n.gid), "Error while initilizating schema")
Manish R Jain
committed
}
func (n *node) Run() {
rcBytes, err := n.raftContext.Marshal()
x.Check(err)
Manish R Jain
committed
for {
select {
Manish R Jain
committed
case rd := <-n.Raft().Ready():
x.Check(n.wal.StoreSnapshot(n.gid, rd.Snapshot))
x.Check(n.wal.Store(n.gid, rd.HardState, rd.Entries))
Manish R Jain
committed
n.saveToStorage(rd.Snapshot, rd.HardState, rd.Entries)
Manish R Jain
committed
for _, msg := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
Manish R Jain
committed
n.send(msg)
}
Manish R Jain
committed
if !raft.IsEmptySnap(rd.Snapshot) {
// We don't send snapshots to other nodes. But, if we get one, that means
// either the leader is trying to bring us up to state; or this is the
// snapshot that I created. Only the former case should be handled.
x.Check(rc.Unmarshal(rd.Snapshot.Data))
if rc.Id != n.id {
fmt.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id)
n.retrieveSnapshot(rc)
fmt.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid)
} else {
fmt.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id)
}
Manish R Jain
committed
}
Manish R Jain
committed
if len(rd.CommittedEntries) > 0 {
x.Trace(n.ctx, "Found %d committed entries", len(rd.CommittedEntries))
}
for _, entry := range rd.CommittedEntries {
// Need applied watermarks for schema mutation also for read linearazibility
status := x.Mark{Index: entry.Index, Done: false}
n.applied.Ch <- status
posting.SyncMarkFor(n.gid).Ch <- status
// Just queue up to be processed. Don't wait on them.
n.applyCh <- entry
Manish R Jain
committed
}
if firstRun && n.canCampaign {
go n.Raft().Campaign(n.ctx)
Manish R Jain
committed
Manish R Jain
committed
return
}
}
}
func (n *node) Stop() {
select {
case n.stop <- struct{}{}:
case <-n.done:
// already stopped.
return
}
<-n.done // wait for Run to respond.
}
Manish R Jain
committed
func (n *node) Step(ctx context.Context, msg raftpb.Message) error {
return n.Raft().Step(ctx, msg)
Manish R Jain
committed
}
func (n *node) snapshotPeriodically() {
if n.gid == 0 {
// Group zero is dedicated for membership information, whose state we don't persist.
// So, taking snapshots would end up deleting the RAFT entries that we need to
// regenerate the state on a crash. Therefore, don't take snapshots.
return
}
var prev string
// TODO: What should be ideal value for snapshotting ? If a node is lost due to network
// partition or some other issue for more than log compaction tick interval, then that
// node needs to fetch snapshot since logs would be truncated
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
water := posting.SyncMarkFor(n.gid)
Manish R Jain
committed
existing, err := n.store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
Manish R Jain
committed
si := existing.Metadata.Index
if le <= si {
msg := fmt.Sprintf("Current watermark %d <= previous snapshot %d. Skipping.", le, si)
if msg != prev {
prev = msg
fmt.Println(msg)
}
continue
}
msg := fmt.Sprintf("Taking snapshot for group: %d at watermark: %d\n", n.gid, le)
if msg != prev {
prev = msg
fmt.Println(msg)
}
s, err := n.store.CreateSnapshot(le, n.ConfState(), rc)
x.Checkf(err, "While creating snapshot")
x.Checkf(n.store.Compact(le), "While compacting snapshot")
Manish R Jain
committed
case <-n.done:
return
}
Manish R Jain
committed
}
}
// Get leader information for MY group.
Manish R Jain
committed
n.Connect(pid, paddr)
fmt.Printf("joinPeers connected with: %q with peer id: %d\n", paddr, pid)
Manish R Jain
committed
pool := pools().get(paddr)
x.AssertTruef(pool != nil, "Unable to get pool for addr: %q for peer: %d", paddr, pid)
Manish R Jain
committed
Manish R Jain
committed
// Bring the instance up to speed first.
// Raft would decide whether snapshot needs to fetched or not
// so populateShard is not needed
// _, err := populateShard(n.ctx, pool, n.gid)
// x.Checkf(err, "Error while populating shard")
Manish R Jain
committed
conn, err := pool.Get()
x.Check(err)
defer pool.Put(conn)
c := workerp.NewWorkerClient(conn)
x.Printf("Calling JoinCluster")
_, err = c.JoinCluster(n.ctx, n.raftContext)
Manish R Jain
committed
x.Checkf(err, "Error while joining cluster")
x.Printf("Done with JoinCluster call\n")
}
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
func (n *node) initFromWal(wal *raftwal.Wal) (restart bool, rerr error) {
n.wal = wal
var sp raftpb.Snapshot
sp, rerr = wal.Snapshot(n.gid)
if rerr != nil {
return
}
var term, idx uint64
if !raft.IsEmptySnap(sp) {
fmt.Printf("Found Snapshot: %+v\n", sp)
restart = true
if rerr = n.store.ApplySnapshot(sp); rerr != nil {
return
}
term = sp.Metadata.Term
idx = sp.Metadata.Index
}
var hd raftpb.HardState
hd, rerr = wal.HardState(n.gid)
if rerr != nil {
return
}
if !raft.IsEmptyHardState(hd) {
fmt.Printf("Found hardstate: %+v\n", sp)
restart = true
if rerr = n.store.SetHardState(hd); rerr != nil {
return
Manish R Jain
committed
}
}
var es []raftpb.Entry
es, rerr = wal.Entries(n.gid, term, idx)
if rerr != nil {
return
}
fmt.Printf("Group %d found %d entries\n", n.gid, len(es))
if len(es) > 0 {
restart = true
}
rerr = n.store.Append(es)
return
}
// InitAndStartNode gets called after having at least one membership sync with the cluster.
func (n *node) InitAndStartNode(wal *raftwal.Wal) {
restart, err := n.initFromWal(wal)
x.Check(err)
if restart {
fmt.Printf("Restarting node for group: %d\n", n.gid)
n.SetRaft(raft.RestartNode(n.cfg))
fmt.Printf("New Node for group: %d\n", n.gid)
if groups().HasPeer(n.gid) {
n.joinPeers()
n.SetRaft(raft.StartNode(n.cfg, nil))
} else {
peers := []raft.Peer{{ID: n.id}}
n.SetRaft(raft.StartNode(n.cfg, peers))
// Trigger election, so this node can become the leader of this single-node cluster.
n.canCampaign = true
Manish R Jain
committed
go n.Run()
// TODO: Find a better way to snapshot, so we don't lose the membership
// state information, which isn't persisted.
Manish R Jain
committed
}
func (n *node) AmLeader() bool {
r := n.Raft()
return r.Status().Lead == r.Status().ID
Manish R Jain
committed
}
func (w *grpcWorker) applyMessage(ctx context.Context, msg raftpb.Message) error {
x.Check(rc.Unmarshal(msg.Context))
node := groups().Node(rc.Group)
// TODO: Handle the case where node isn't present for this group.
node.Connect(msg.From, rc.Addr)
c := make(chan error, 1)
go func() { c <- node.Step(ctx, msg) }()
select {
case <-ctx.Done():
case err := <-c:
func (w *grpcWorker) RaftMessage(ctx context.Context, query *workerp.Payload) (*workerp.Payload, error) {
return &workerp.Payload{}, ctx.Err()
}
for idx := 0; idx < len(query.Data); {
x.AssertTruef(len(query.Data[idx:]) >= 4,
"Slice left of size: %v. Expected at least 4.", len(query.Data[idx:]))
sz := int(binary.LittleEndian.Uint32(query.Data[idx : idx+4]))
return &workerp.Payload{}, x.Errorf(
"Invalid query. Size specified: %v. Size of array: %v\n", sz, len(query.Data))
}
if err := msg.Unmarshal(query.Data[idx : idx+sz]); err != nil {
if msg.Type != raftpb.MsgHeartbeat && msg.Type != raftpb.MsgHeartbeatResp {
fmt.Printf("RECEIVED: %v %v-->%v\n", msg.Type, msg.From, msg.To)
}
if err := w.applyMessage(ctx, msg); err != nil {
return &workerp.Payload{}, err
}
return &workerp.Payload{}, nil
}
func (w *grpcWorker) JoinCluster(ctx context.Context, rc *taskp.RaftContext) (*workerp.Payload, error) {
if ctx.Err() != nil {
return &workerp.Payload{}, ctx.Err()
}
// Best effor reject
if _, found := groups().Server(rc.Id, rc.Group); found || rc.Id == *raftId {
return &workerp.Payload{}, x.Errorf(ErrorNodeIDExists)
}
node := groups().Node(rc.Group)
return &workerp.Payload{}, nil
c := make(chan error, 1)
go func() { c <- node.AddToCluster(ctx, rc.Id) }()
select {
case <-ctx.Done():
return &workerp.Payload{}, ctx.Err()
case err := <-c:
return &workerp.Payload{}, err