Skip to content
Snippets Groups Projects
Unverified Commit d9c3671f authored by Manish R Jain's avatar Manish R Jain
Browse files

Run raft.Step in serial order, without using goroutines.

parent 351c15fd
Branches
Tags
No related merge requests found
...@@ -167,24 +167,6 @@ func (w *RaftServer) JoinCluster(ctx context.Context, ...@@ -167,24 +167,6 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
return &api.Payload{}, err return &api.Payload{}, err
} }
var ()
func (w *RaftServer) applyMessage(ctx context.Context, msg raftpb.Message) error {
node := w.GetNode()
if node == nil || node.Raft() == nil {
return ErrNoNode
}
c := make(chan error, 1)
go func() { c <- node.Raft().Step(ctx, msg) }()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-c:
return err
}
}
func (w *RaftServer) RaftMessage(ctx context.Context, func (w *RaftServer) RaftMessage(ctx context.Context,
batch *intern.RaftBatch) (*api.Payload, error) { batch *intern.RaftBatch) (*api.Payload, error) {
if ctx.Err() != nil { if ctx.Err() != nil {
...@@ -194,7 +176,7 @@ func (w *RaftServer) RaftMessage(ctx context.Context, ...@@ -194,7 +176,7 @@ func (w *RaftServer) RaftMessage(ctx context.Context,
rc := batch.GetContext() rc := batch.GetContext()
if rc != nil { if rc != nil {
n := w.GetNode() n := w.GetNode()
if n == nil { if n == nil || n.Raft() == nil {
return &api.Payload{}, ErrNoNode return &api.Payload{}, ErrNoNode
} }
n.Connect(rc.Id, rc.Addr) n.Connect(rc.Id, rc.Addr)
...@@ -203,6 +185,7 @@ func (w *RaftServer) RaftMessage(ctx context.Context, ...@@ -203,6 +185,7 @@ func (w *RaftServer) RaftMessage(ctx context.Context,
return &api.Payload{}, nil return &api.Payload{}, nil
} }
data := batch.Payload.Data data := batch.Payload.Data
raft := w.GetNode().Raft()
for idx := 0; idx < len(data); { for idx := 0; idx < len(data); {
x.AssertTruef(len(data[idx:]) >= 4, x.AssertTruef(len(data[idx:]) >= 4,
...@@ -219,7 +202,8 @@ func (w *RaftServer) RaftMessage(ctx context.Context, ...@@ -219,7 +202,8 @@ func (w *RaftServer) RaftMessage(ctx context.Context,
if err := msg.Unmarshal(data[idx : idx+sz]); err != nil { if err := msg.Unmarshal(data[idx : idx+sz]); err != nil {
x.Check(err) x.Check(err)
} }
if err := w.applyMessage(ctx, msg); err != nil { // This should be done in order, and not via a goroutine.
if err := raft.Step(ctx, msg); err != nil {
return &api.Payload{}, err return &api.Payload{}, err
} }
idx += sz idx += sz
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment