diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 288ce76ec69f461fd6b9b2419c4cb6f757609d6c..8ac9dfe89be2b20e8ce811916741edc6224845a2 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -1,153 +1,153 @@ -package executor - -import ( - "bytes" - "context" - "io" - "net/http" - "sync" - "time" - - "go.uber.org/zap" - - "code.vereign.com/gaiax/tsa/golib/errors" - "code.vereign.com/gaiax/tsa/task/internal/service/task" -) - -const workers = 5 - -type Queue interface { - Poll(ctx context.Context) (*task.Task, error) - Ack(ctx context.Context, task *task.Task) error - Unack(ctx context.Context, task *task.Task) error -} - -type Executor struct { - queue Queue - workers int - pollInterval time.Duration - - httpClient *http.Client - logger *zap.Logger -} - -func New() *Executor { - return &Executor{ - httpClient: http.DefaultClient, - } -} - -func (e *Executor) Start(ctx context.Context) { - if e.workers == 0 { - e.workers = workers - } - - var wg sync.WaitGroup - for i := 0; i < e.workers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - e.Worker(ctx) - }() - } - - <-ctx.Done() - wg.Wait() - - e.logger.Info("task executor stopped") -} - -func (e *Executor) Worker(ctx context.Context) { - for { - select { - case <-ctx.Done(): - e.logger.Debug("task execution worker stopped") - return - case <-time.After(e.pollInterval): - task, err := e.queue.Poll(ctx) - if err != nil { - if !errors.Is(errors.NotFound, err) { - e.logger.Error("error getting task from queue", zap.Error(err)) - } - continue - } - - logger := e.logger.With( - zap.String("taskID", task.ID), - zap.String("taskName", task.Name), - ) - - executedTask, err := e.Execute(ctx, task) - if err != nil { - logger.Error("error executing task", zap.Error(err)) - if err := e.queue.Unack(ctx, task); err != nil { - logger.Error("failed to unack task in queue", zap.Error(err)) - } - continue - } - - if len(executedTask.CacheKey) > 0 { - if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil { - // TODO(penkovski): should task execution repeat if result cannot be cached? - logger.Error("failed to write task result in cache", zap.Error(err)) - } - } - - if err := e.saveTaskHistory(ctx, executedTask); err != nil { - logger.Error("failed to save task in history", zap.Error(err)) - } - } - } -} - -func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) { - // TODO: evaluate request policy - t.StartedAt = time.Now() - status, response, err := e.execute(ctx, t) - if err != nil { - return nil, err - } - - t.State = task.Done - t.ResponseCode = status - t.Response = response - - if t.ResponsePolicy != "" { - // TODO: evaluate response policy - } - - if t.FinalPolicy != "" { - // TODO: evaluate finalizer policy - } - - t.FinishedAt = time.Now() - return t, nil -} - -func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) { - req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request)) - if err != nil { - return 0, nil, errors.New("error creating http request", err) - } - - resp, err := e.httpClient.Do(req) - if err != nil { - return 0, nil, errors.New("error executing http request", err) - } - defer resp.Body.Close() - - response, err = io.ReadAll(resp.Body) - if err != nil { - return 0, nil, errors.New("error reading response body", err) - } - - return resp.StatusCode, response, nil -} - -func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error { - return errors.New("not implemented") -} - -func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error { - return errors.New("not implemented") -} +//package executor +// +//import ( +// "bytes" +// "context" +// "io" +// "net/http" +// "sync" +// "time" +// +// "go.uber.org/zap" +// +// "code.vereign.com/gaiax/tsa/golib/errors" +// "code.vereign.com/gaiax/tsa/task/internal/service/task" +//) +// +//const workers = 5 +// +//type Queue interface { +// Poll(ctx context.Context) (*task.Task, error) +// Ack(ctx context.Context, task *task.Task) error +// Unack(ctx context.Context, task *task.Task) error +//} +// +//type Executor struct { +// queue Queue +// workers int +// pollInterval time.Duration +// +// httpClient *http.Client +// logger *zap.Logger +//} +// +//func New() *Executor { +// return &Executor{ +// httpClient: http.DefaultClient, +// } +//} +// +//func (e *Executor) Start(ctx context.Context) { +// if e.workers == 0 { +// e.workers = workers +// } +// +// var wg sync.WaitGroup +// for i := 0; i < e.workers; i++ { +// wg.Add(1) +// go func() { +// defer wg.Done() +// e.Worker(ctx) +// }() +// } +// +// <-ctx.Done() +// wg.Wait() +// +// e.logger.Info("task executor stopped") +//} +// +//func (e *Executor) Worker(ctx context.Context) { +// for { +// select { +// case <-ctx.Done(): +// e.logger.Debug("task execution worker stopped") +// return +// case <-time.After(e.pollInterval): +// task, err := e.queue.Poll(ctx) +// if err != nil { +// if !errors.Is(errors.NotFound, err) { +// e.logger.Error("error getting task from queue", zap.Error(err)) +// } +// continue +// } +// +// logger := e.logger.With( +// zap.String("taskID", task.ID), +// zap.String("taskName", task.Name), +// ) +// +// executedTask, err := e.Execute(ctx, task) +// if err != nil { +// logger.Error("error executing task", zap.Error(err)) +// if err := e.queue.Unack(ctx, task); err != nil { +// logger.Error("failed to unack task in queue", zap.Error(err)) +// } +// continue +// } +// +// if len(executedTask.CacheKey) > 0 { +// if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil { +// // TODO(penkovski): should task execution repeat if result cannot be cached? +// logger.Error("failed to write task result in cache", zap.Error(err)) +// } +// } +// +// if err := e.saveTaskHistory(ctx, executedTask); err != nil { +// logger.Error("failed to save task in history", zap.Error(err)) +// } +// } +// } +//} +// +//func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) { +// // TODO: evaluate request policy +// t.StartedAt = time.Now() +// status, response, err := e.execute(ctx, t) +// if err != nil { +// return nil, err +// } +// +// t.State = task.Done +// t.ResponseCode = status +// t.Response = response +// +// if t.ResponsePolicy != "" { +// // TODO: evaluate response policy +// } +// +// if t.FinalPolicy != "" { +// // TODO: evaluate finalizer policy +// } +// +// t.FinishedAt = time.Now() +// return t, nil +//} +// +//func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) { +// req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request)) +// if err != nil { +// return 0, nil, errors.New("error creating http request", err) +// } +// +// resp, err := e.httpClient.Do(req) +// if err != nil { +// return 0, nil, errors.New("error executing http request", err) +// } +// defer resp.Body.Close() +// +// response, err = io.ReadAll(resp.Body) +// if err != nil { +// return 0, nil, errors.New("error reading response body", err) +// } +// +// return resp.StatusCode, response, nil +//} +// +//func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error { +// return errors.New("not implemented") +//} +// +//func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error { +// return errors.New("not implemented") +//} diff --git a/internal/executor/executor2.go b/internal/executor/executor2.go new file mode 100644 index 0000000000000000000000000000000000000000..ec7f63e80ac5fefd17e0ca4335c1fcbb7c97532f --- /dev/null +++ b/internal/executor/executor2.go @@ -0,0 +1,74 @@ +package executor + +import ( + "context" + "net/http" + "sync" + "time" + + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +const workers = 5 + +type Queue interface { + Poll(ctx context.Context) (*task.Task, error) + Ack(ctx context.Context, task *task.Task) error + Unack(ctx context.Context, task *task.Task) error +} + +type Executor struct { + queue Queue + workers int + pollInterval time.Duration + + httpClient *http.Client + logger *zap.Logger +} + +func New() *Executor { + return &Executor{} +} + +func (e *Executor) Start(ctx context.Context) error { + defer e.logger.Info("task executor stopped") + + if e.workers == 0 { + e.workers = workers + } + + var wg sync.WaitGroup + tasks := make(chan *task.Task) + for i := 0; i < e.workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + worker := newWorker(tasks, e.queue, e.httpClient, e.logger) + worker.Start(ctx) + }() + } + +loop: + for { + select { + case <-ctx.Done(): + break loop + case <-time.After(e.pollInterval): + t, err := e.queue.Poll(ctx) + if err != nil { + if !errors.Is(errors.NotFound, err) { + e.logger.Error("error getting task from queue", zap.Error(err)) + } + continue + } + tasks <- t // send task to the workers for execution + } + } + + wg.Wait() // wait all workers to stop + + return ctx.Err() +} diff --git a/internal/executor/worker.go b/internal/executor/worker.go new file mode 100644 index 0000000000000000000000000000000000000000..65c9246905ae2d5b8ace5ccf979a5c3d130cd049 --- /dev/null +++ b/internal/executor/worker.go @@ -0,0 +1,75 @@ +package executor + +import ( + "context" + "fmt" + "net/http" + + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +type Worker struct { + tasks chan *task.Task + queue Queue + httpClient *http.Client + logger *zap.Logger +} + +func newWorker(tasks chan *task.Task, queue Queue, httpClient *http.Client, logger *zap.Logger) *Worker { + return &Worker{ + tasks: tasks, + queue: queue, + httpClient: httpClient, + logger: logger, + } +} + +func (w *Worker) Start(ctx context.Context) { + defer w.logger.Debug("task worker stopped") + + for { + select { + case <-ctx.Done(): + return + case t := <-w.tasks: + logger := w.logger.With( + zap.String("taskID", t.ID), + zap.String("taskName", t.Name), + ) + + executed, err := w.execute(ctx, t) + if err != nil { + logger.Error("error executing task", zap.Error(err)) + if err := w.queue.Unack(ctx, t); err != nil { + logger.Error("failed to unack task in queue", zap.Error(err)) + } + continue + } + + if err := w.cacheResult(ctx, executed); err != nil { + // TODO: should we remove the task from queue or it should be retried? + logger.Error("error caching task result", zap.Error(err)) + continue + } + + if err := w.saveTaskHistory(ctx, executed); err != nil { + logger.Error("error saving task history", zap.Error(err)) + continue + } + } + } +} + +func (w *Worker) execute(ctx context.Context, task *task.Task) (*task.Task, error) { + return nil, fmt.Errorf("not implemented") +} + +func (w *Worker) cacheResult(ctx context.Context, task *task.Task) error { + return fmt.Errorf("not implemented") +} + +func (w *Worker) saveTaskHistory(ctx context.Context, task *task.Task) error { + return fmt.Errorf("not implemented") +}