diff --git a/cmd/task/main.go b/cmd/task/main.go index d721e0ce5e28067f84f04a7ff67aaff4d8477b8d..d293aa64636f22d22b65438e0bd1056df823e6e7 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -72,6 +72,7 @@ func main() { executor := executor.New( storage, policy, + storage, cfg.Executor.Workers, cfg.Executor.PollInterval, httpClient(), diff --git a/internal/executor/executor.go b/internal/executor/executor.go index f49ae4d0316ac5702d7916ed659aae2917d35c26..762df345f7b5c3f8fa3209266c55305f0d73665f 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -24,9 +24,14 @@ type Queue interface { Unack(ctx context.Context, task *task.Task) error } +type Storage interface { + SaveTaskHistory(ctx context.Context, task *task.Task) error +} + type Executor struct { queue Queue policy Policy + storage Storage workers int pollInterval time.Duration @@ -37,6 +42,7 @@ type Executor struct { func New( queue Queue, policy Policy, + storage Storage, workers int, pollInterval time.Duration, httpClient *http.Client, @@ -45,6 +51,7 @@ func New( return &Executor{ queue: queue, policy: policy, + storage: storage, workers: workers, pollInterval: pollInterval, httpClient: httpClient, @@ -61,7 +68,7 @@ func (e *Executor) Start(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - worker := newWorker(tasks, e.queue, e.policy, e.httpClient, e.logger) + worker := newWorker(tasks, e.queue, e.policy, e.storage, e.httpClient, e.logger) worker.Start(ctx) }() } diff --git a/internal/executor/worker.go b/internal/executor/worker.go index c58bcaccf059099fe91207074e418ca09a238c55..deec231362a2a4878671b40198e3777241a3b96a 100644 --- a/internal/executor/worker.go +++ b/internal/executor/worker.go @@ -3,7 +3,6 @@ package executor import ( "bytes" "context" - "fmt" "io" "net/http" "time" @@ -18,15 +17,17 @@ type Worker struct { tasks chan *taskpkg.Task queue Queue policy Policy + storage Storage httpClient *http.Client logger *zap.Logger } -func newWorker(tasks chan *taskpkg.Task, queue Queue, policy Policy, httpClient *http.Client, logger *zap.Logger) *Worker { +func newWorker(tasks chan *taskpkg.Task, queue Queue, policy Policy, storage Storage, httpClient *http.Client, logger *zap.Logger) *Worker { return &Worker{ tasks: tasks, queue: queue, policy: policy, + storage: storage, httpClient: httpClient, logger: logger, } @@ -54,7 +55,8 @@ func (w *Worker) Start(ctx context.Context) { continue } - if err := w.saveTaskHistory(ctx, executed); err != nil { + if err := w.storage.SaveTaskHistory(ctx, executed); err != nil { + // TODO: is this fatal error, should the task be re-executed? logger.Error("error saving task history", zap.Error(err)) continue } @@ -130,7 +132,3 @@ func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, respon return resp.StatusCode, response, nil } - -func (w *Worker) saveTaskHistory(ctx context.Context, task *taskpkg.Task) error { - return fmt.Errorf("not implemented") -} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index d92e4226f3e62969c411a5913a2e353072ff4568..028ef4c00ba3e17e658e9652f96f9424ed7db414 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -106,3 +106,9 @@ func (s *Storage) Unack(ctx context.Context, t *task.Task) error { _, err := s.tasks.UpdateOne(ctx, filter, update) return err } + +// SaveTaskHistory saves a task to the `tasksHistory` collection. +func (s *Storage) SaveTaskHistory(ctx context.Context, task *task.Task) error { + _, err := s.tasksHistory.InsertOne(ctx, task) + return err +}