Skip to content
Snippets Groups Projects
Commit a6ed68a5 authored by Lyuben Penkovski's avatar Lyuben Penkovski
Browse files

Save executed tasks after successfull execution to the history collection

parent b329a0df
No related branches found
No related tags found
1 merge request!4Task execution of independent tasks
......@@ -72,6 +72,7 @@ func main() {
executor := executor.New(
storage,
policy,
storage,
cfg.Executor.Workers,
cfg.Executor.PollInterval,
httpClient(),
......
......@@ -24,9 +24,14 @@ type Queue interface {
Unack(ctx context.Context, task *task.Task) error
}
type Storage interface {
SaveTaskHistory(ctx context.Context, task *task.Task) error
}
type Executor struct {
queue Queue
policy Policy
storage Storage
workers int
pollInterval time.Duration
......@@ -37,6 +42,7 @@ type Executor struct {
func New(
queue Queue,
policy Policy,
storage Storage,
workers int,
pollInterval time.Duration,
httpClient *http.Client,
......@@ -45,6 +51,7 @@ func New(
return &Executor{
queue: queue,
policy: policy,
storage: storage,
workers: workers,
pollInterval: pollInterval,
httpClient: httpClient,
......@@ -61,7 +68,7 @@ func (e *Executor) Start(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
worker := newWorker(tasks, e.queue, e.policy, e.httpClient, e.logger)
worker := newWorker(tasks, e.queue, e.policy, e.storage, e.httpClient, e.logger)
worker.Start(ctx)
}()
}
......
......@@ -3,7 +3,6 @@ package executor
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
......@@ -18,15 +17,17 @@ type Worker struct {
tasks chan *taskpkg.Task
queue Queue
policy Policy
storage Storage
httpClient *http.Client
logger *zap.Logger
}
func newWorker(tasks chan *taskpkg.Task, queue Queue, policy Policy, httpClient *http.Client, logger *zap.Logger) *Worker {
func newWorker(tasks chan *taskpkg.Task, queue Queue, policy Policy, storage Storage, httpClient *http.Client, logger *zap.Logger) *Worker {
return &Worker{
tasks: tasks,
queue: queue,
policy: policy,
storage: storage,
httpClient: httpClient,
logger: logger,
}
......@@ -54,7 +55,8 @@ func (w *Worker) Start(ctx context.Context) {
continue
}
if err := w.saveTaskHistory(ctx, executed); err != nil {
if err := w.storage.SaveTaskHistory(ctx, executed); err != nil {
// TODO: is this fatal error, should the task be re-executed?
logger.Error("error saving task history", zap.Error(err))
continue
}
......@@ -130,7 +132,3 @@ func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, respon
return resp.StatusCode, response, nil
}
func (w *Worker) saveTaskHistory(ctx context.Context, task *taskpkg.Task) error {
return fmt.Errorf("not implemented")
}
......@@ -106,3 +106,9 @@ func (s *Storage) Unack(ctx context.Context, t *task.Task) error {
_, err := s.tasks.UpdateOne(ctx, filter, update)
return err
}
// SaveTaskHistory saves a task to the `tasksHistory` collection.
func (s *Storage) SaveTaskHistory(ctx context.Context, task *task.Task) error {
_, err := s.tasksHistory.InsertOne(ctx, task)
return err
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment