diff --git a/conn/raft_server.go b/conn/raft_server.go index fdea1ce51152feef5734ecf6911fc891a436ed73..53ba97b88be7fda71fd3ab38a3be640bd18a10f2 100644 --- a/conn/raft_server.go +++ b/conn/raft_server.go @@ -167,24 +167,6 @@ func (w *RaftServer) JoinCluster(ctx context.Context, return &api.Payload{}, err } -var () - -func (w *RaftServer) applyMessage(ctx context.Context, msg raftpb.Message) error { - node := w.GetNode() - if node == nil || node.Raft() == nil { - return ErrNoNode - } - - c := make(chan error, 1) - go func() { c <- node.Raft().Step(ctx, msg) }() - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-c: - return err - } -} func (w *RaftServer) RaftMessage(ctx context.Context, batch *intern.RaftBatch) (*api.Payload, error) { if ctx.Err() != nil { @@ -194,7 +176,7 @@ func (w *RaftServer) RaftMessage(ctx context.Context, rc := batch.GetContext() if rc != nil { n := w.GetNode() - if n == nil { + if n == nil || n.Raft() == nil { return &api.Payload{}, ErrNoNode } n.Connect(rc.Id, rc.Addr) @@ -203,6 +185,7 @@ func (w *RaftServer) RaftMessage(ctx context.Context, return &api.Payload{}, nil } data := batch.Payload.Data + raft := w.GetNode().Raft() for idx := 0; idx < len(data); { x.AssertTruef(len(data[idx:]) >= 4, @@ -219,7 +202,8 @@ func (w *RaftServer) RaftMessage(ctx context.Context, if err := msg.Unmarshal(data[idx : idx+sz]); err != nil { x.Check(err) } - if err := w.applyMessage(ctx, msg); err != nil { + // This should be done in order, and not via a goroutine. + if err := raft.Step(ctx, msg); err != nil { return &api.Payload{}, err } idx += sz