Skip to content
Snippets Groups Projects
Commit 7010e576 authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

Merge branch '18-limit-task-execution-attempts' into 'main'

Add max retry limit for task execution

Closes #18

See merge request !13
parents 05b7f96e 0cc81955
No related branches found
No related tags found
1 merge request!13Add max retry limit for task execution
Pipeline #51928 passed with stages
in 55 seconds
......@@ -84,6 +84,7 @@ func main() {
cache,
cfg.Executor.Workers,
cfg.Executor.PollInterval,
cfg.Executor.MaxTaskRetries,
httpClient(),
logger,
)
......
#Task service - Queue
# Task service - Queue
### Why the Queue is Database
......
......@@ -50,12 +50,12 @@ _Task definition_ contains: | `requestPolicy` only | `url` and `method` only | B
### Task Executor Configuration
There are two environment variables that control the level of concurrency
of the task executor.
There are three environment variables that control the behavior of the task executor.
```shell
EXECUTOR_WORKERS="5"
EXECUTOR_POLL_INTERVAL="1s"
EXECUTOR_MAX_TASK_RETRIES="10"
```
Poll interval specifies how often the executor will try to fetch a *pending* task
......@@ -67,6 +67,10 @@ If there are multiple instances (pods) of the service, multiply by their number
If this is not enough, the poll interval can be decreased, or we can slightly modify
the polling function to fetch many tasks at once (and also increase the number of workers).
Maximum task retries specifies how many failed attempts to execute a single task are going
to be made by workers before the task is removed from the queue. In the example above workers are going to
execute a task 10 times and fail before the task is removed.
To learn more about the queue and why we use database as queue see [queue](queue.md)
### Task Storage
......
......@@ -32,8 +32,9 @@ type policyConfig struct {
}
type executorConfig struct {
Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"`
PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"`
Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"`
PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"`
MaxTaskRetries int `envconfig:"EXECUTOR_MAX_TASK_RETRIES" default:"10"`
}
type listExecutorConfig struct {
......
......@@ -34,12 +34,13 @@ type Cache interface {
}
type Executor struct {
queue Queue
policy Policy
storage Storage
cache Cache
workers int
pollInterval time.Duration
queue Queue
policy Policy
storage Storage
cache Cache
workers int
pollInterval time.Duration
maxTaskRetries int
httpClient *http.Client
logger *zap.Logger
......@@ -52,18 +53,20 @@ func New(
cache Cache,
workers int,
pollInterval time.Duration,
maxTaskRetries int,
httpClient *http.Client,
logger *zap.Logger,
) *Executor {
return &Executor{
queue: queue,
policy: policy,
storage: storage,
cache: cache,
workers: workers,
pollInterval: pollInterval,
httpClient: httpClient,
logger: logger,
queue: queue,
policy: policy,
storage: storage,
cache: cache,
workers: workers,
pollInterval: pollInterval,
maxTaskRetries: maxTaskRetries,
httpClient: httpClient,
logger: logger,
}
}
......@@ -76,7 +79,7 @@ func (e *Executor) Start(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.httpClient, e.logger)
worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.maxTaskRetries, e.httpClient, e.logger)
worker.Start(ctx)
}()
}
......
......@@ -14,13 +14,14 @@ import (
)
type Worker struct {
tasks chan *taskpkg.Task
queue Queue
policy Policy
storage Storage
cache Cache
httpClient *http.Client
logger *zap.Logger
tasks chan *taskpkg.Task
queue Queue
policy Policy
storage Storage
cache Cache
maxTaskRetries int
httpClient *http.Client
logger *zap.Logger
}
func newWorker(
......@@ -29,17 +30,19 @@ func newWorker(
policy Policy,
storage Storage,
cache Cache,
maxTaskRetries int,
httpClient *http.Client,
logger *zap.Logger,
) *Worker {
return &Worker{
tasks: tasks,
queue: queue,
policy: policy,
storage: storage,
cache: cache,
httpClient: httpClient,
logger: logger,
tasks: tasks,
queue: queue,
policy: policy,
storage: storage,
cache: cache,
maxTaskRetries: maxTaskRetries,
httpClient: httpClient,
logger: logger,
}
}
......@@ -56,6 +59,15 @@ func (w *Worker) Start(ctx context.Context) {
zap.String("taskName", t.Name),
)
if t.Retries >= w.maxTaskRetries {
if err := w.queue.Ack(ctx, t); err != nil {
logger.Error("failed to ack task in queue", zap.Error(err))
} else {
logger.Error("task removed from queue due to too many failed executions")
}
continue
}
executed, err := w.Execute(ctx, t)
if err != nil {
logger.Error("error executing task", zap.Error(err))
......
......@@ -139,6 +139,11 @@ func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) {
list.State = taskpkg.Failed
}
state.Groups = append(state.Groups, groupState)
//mark taskList as `Failed` if the group's state is `Failed`
if *groupState.Status == taskpkg.Failed {
list.State = taskpkg.Failed
}
}
if list.State != taskpkg.Failed {
......
......@@ -39,6 +39,7 @@ type Task struct {
FinalPolicy string `json:"finalPolicy"` // FinalPolicy to be executed on the task response.
CacheNamespace string `json:"cacheNamespace"` // CacheNamespace if set, is used for constructing cache key.
CacheScope string `json:"cacheScope"` // CacheScope if set, is used for constructing cache key.
Retries int `json:"retries"` // Retries is the number of failed attempts to execute this task
CreatedAt time.Time `json:"createdAt"` // CreatedAt specifies task creation time.
StartedAt time.Time `json:"startedAt"` // StartedAt specifies task execution start time.
FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done.
......
......@@ -113,7 +113,7 @@ func (s *Storage) Ack(ctx context.Context, task *task.Task) error {
// it can be retrieved for processing again.
func (s *Storage) Unack(ctx context.Context, t *task.Task) error {
filter := bson.M{"id": t.ID}
update := bson.M{"$set": bson.M{"state": task.Created}}
update := bson.M{"$set": bson.M{"state": task.Created, "retries": t.Retries + 1}}
_, err := s.tasks.UpdateOne(ctx, filter, update)
return err
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment