diff --git a/cmd/task/main.go b/cmd/task/main.go index d65bbc215762363852ff9b0e50f28c81a478e1e6..cf910508a2bf2eb058a167463fb18de06aadf734 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -84,6 +84,7 @@ func main() { cache, cfg.Executor.Workers, cfg.Executor.PollInterval, + cfg.Executor.MaxTaskRetries, httpClient(), logger, ) diff --git a/docs/queue.md b/docs/queue.md index dc976358d41f23a9ce6cc0115040394c56bd8eb9..bcf05b85713d16e6370e475da9dd91d7b42c94e3 100644 --- a/docs/queue.md +++ b/docs/queue.md @@ -1,4 +1,4 @@ -#Task service - Queue +# Task service - Queue ### Why the Queue is Database diff --git a/docs/task.md b/docs/task.md index c023af206b6bad0d539b1d9b2d1b47311b7846ef..fb47e7d01f77a38e82670c589064960ae8ddfb47 100644 --- a/docs/task.md +++ b/docs/task.md @@ -38,12 +38,12 @@ execution will be stored in the TSA Cache after the task is completed. ### Task Executor Configuration -There are two environment variables that control the level of concurrency -of the task executor. +There are three environment variables that control the behavior of the task executor. ```shell EXECUTOR_WORKERS="5" EXECUTOR_POLL_INTERVAL="1s" +EXECUTOR_MAX_TASK_RETRIES="10" ``` Poll interval specifies how often the executor will try to fetch a *pending* task @@ -55,6 +55,10 @@ If there are multiple instances (pods) of the service, multiply by their number If this is not enough, the poll interval can be decreased, or we can slightly modify the polling function to fetch many tasks at once (and also increase the number of workers). +Maximum task retries specifies how many failed attempts to execute a single task are going +to be made by workers before the task is removed from the queue. In the example above workers are going to +execute a task 10 times and fail before the task is removed. + To learn more about the queue and why we use database as queue see [queue](queue.md) ### Task Storage diff --git a/internal/config/config.go b/internal/config/config.go index 42b6cce22080f379c5bdc7095eee55bb894df94c..a4127c5cbdb91b08118964ddcb4aa4dfcee571eb 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -32,8 +32,9 @@ type policyConfig struct { } type executorConfig struct { - Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"` - PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` + Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"` + PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` + MaxTaskRetries int `envconfig:"EXECUTOR_MAX_TASK_RETRIES" default:"10"` } type listExecutorConfig struct { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index c59bc745e36a59664953e36bc75be0da457fc5e6..eb3f82e394727a1c01c3e6f123e966e02882fd28 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -34,12 +34,13 @@ type Cache interface { } type Executor struct { - queue Queue - policy Policy - storage Storage - cache Cache - workers int - pollInterval time.Duration + queue Queue + policy Policy + storage Storage + cache Cache + workers int + pollInterval time.Duration + maxTaskRetries int httpClient *http.Client logger *zap.Logger @@ -52,18 +53,20 @@ func New( cache Cache, workers int, pollInterval time.Duration, + maxTaskRetries int, httpClient *http.Client, logger *zap.Logger, ) *Executor { return &Executor{ - queue: queue, - policy: policy, - storage: storage, - cache: cache, - workers: workers, - pollInterval: pollInterval, - httpClient: httpClient, - logger: logger, + queue: queue, + policy: policy, + storage: storage, + cache: cache, + workers: workers, + pollInterval: pollInterval, + maxTaskRetries: maxTaskRetries, + httpClient: httpClient, + logger: logger, } } @@ -76,7 +79,7 @@ func (e *Executor) Start(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.httpClient, e.logger) + worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.maxTaskRetries, e.httpClient, e.logger) worker.Start(ctx) }() } diff --git a/internal/executor/worker.go b/internal/executor/worker.go index aea032d32ef535db6a96d4b28a94335cd9379f1c..1f30bf145f2663d0ea7b23f262bd6a7462f5ad95 100644 --- a/internal/executor/worker.go +++ b/internal/executor/worker.go @@ -14,13 +14,14 @@ import ( ) type Worker struct { - tasks chan *taskpkg.Task - queue Queue - policy Policy - storage Storage - cache Cache - httpClient *http.Client - logger *zap.Logger + tasks chan *taskpkg.Task + queue Queue + policy Policy + storage Storage + cache Cache + maxTaskRetries int + httpClient *http.Client + logger *zap.Logger } func newWorker( @@ -29,17 +30,19 @@ func newWorker( policy Policy, storage Storage, cache Cache, + maxTaskRetries int, httpClient *http.Client, logger *zap.Logger, ) *Worker { return &Worker{ - tasks: tasks, - queue: queue, - policy: policy, - storage: storage, - cache: cache, - httpClient: httpClient, - logger: logger, + tasks: tasks, + queue: queue, + policy: policy, + storage: storage, + cache: cache, + maxTaskRetries: maxTaskRetries, + httpClient: httpClient, + logger: logger, } } @@ -56,6 +59,15 @@ func (w *Worker) Start(ctx context.Context) { zap.String("taskName", t.Name), ) + if t.Retries >= w.maxTaskRetries { + if err := w.queue.Ack(ctx, t); err != nil { + logger.Error("failed to ack task in queue", zap.Error(err)) + } else { + logger.Error("task removed from queue due to too many failed executions") + } + continue + } + executed, err := w.Execute(ctx, t) if err != nil { logger.Error("error executing task", zap.Error(err)) diff --git a/internal/listexecutor/listexecutor.go b/internal/listexecutor/listexecutor.go index 29615ad89a23598290f01e7b0e9055485d74e1a0..6deb9e9bccde2496a542625fe527e1fcbdd84e18 100644 --- a/internal/listexecutor/listexecutor.go +++ b/internal/listexecutor/listexecutor.go @@ -139,6 +139,11 @@ func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) { list.State = taskpkg.Failed } state.Groups = append(state.Groups, groupState) + + //mark taskList as `Failed` if the group's state is `Failed` + if *groupState.Status == taskpkg.Failed { + list.State = taskpkg.Failed + } } if list.State != taskpkg.Failed { diff --git a/internal/service/task/task.go b/internal/service/task/task.go index f1d4db423724579cb1a3830488d2c0984e25c11d..8847b7d41dad525a5af9bc122ede658d44dc5569 100644 --- a/internal/service/task/task.go +++ b/internal/service/task/task.go @@ -39,6 +39,7 @@ type Task struct { FinalPolicy string `json:"finalPolicy"` // FinalPolicy to be executed on the task response. CacheNamespace string `json:"cacheNamespace"` // CacheNamespace if set, is used for constructing cache key. CacheScope string `json:"cacheScope"` // CacheScope if set, is used for constructing cache key. + Retries int `json:"retries"` // Retries is the number of failed attempts to execute this task CreatedAt time.Time `json:"createdAt"` // CreatedAt specifies task creation time. StartedAt time.Time `json:"startedAt"` // StartedAt specifies task execution start time. FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done. diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 4a8d1ffaf7089f11273e316f45336238d833579e..520348f3006c25e63469e8f0d02357a3fc1520c2 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -113,7 +113,7 @@ func (s *Storage) Ack(ctx context.Context, task *task.Task) error { // it can be retrieved for processing again. func (s *Storage) Unack(ctx context.Context, t *task.Task) error { filter := bson.M{"id": t.ID} - update := bson.M{"$set": bson.M{"state": task.Created}} + update := bson.M{"$set": bson.M{"state": task.Created, "retries": t.Retries + 1}} _, err := s.tasks.UpdateOne(ctx, filter, update) return err }