From 0cc8195500a8d9b60f62eb68bcb63f7b71896092 Mon Sep 17 00:00:00 2001
From: Yordan Kinkov <yordan.kinkov@vereign.com>
Date: Mon, 27 Jun 2022 14:27:55 +0300
Subject: [PATCH] #18 add max retry limit for task execution

---
 cmd/task/main.go                      |  1 +
 docs/queue.md                         |  2 +-
 docs/task.md                          |  8 ++++--
 internal/config/config.go             |  5 ++--
 internal/executor/executor.go         | 33 ++++++++++++----------
 internal/executor/worker.go           | 40 +++++++++++++++++----------
 internal/listexecutor/listexecutor.go |  5 ++++
 internal/service/task/task.go         |  1 +
 internal/storage/storage.go           |  2 +-
 9 files changed, 62 insertions(+), 35 deletions(-)

diff --git a/cmd/task/main.go b/cmd/task/main.go
index d65bbc2..cf91050 100644
--- a/cmd/task/main.go
+++ b/cmd/task/main.go
@@ -84,6 +84,7 @@ func main() {
 		cache,
 		cfg.Executor.Workers,
 		cfg.Executor.PollInterval,
+		cfg.Executor.MaxTaskRetries,
 		httpClient(),
 		logger,
 	)
diff --git a/docs/queue.md b/docs/queue.md
index dc97635..bcf05b8 100644
--- a/docs/queue.md
+++ b/docs/queue.md
@@ -1,4 +1,4 @@
-#Task service - Queue
+# Task service - Queue
 
 ### Why the Queue is Database
 
diff --git a/docs/task.md b/docs/task.md
index c023af2..fb47e7d 100644
--- a/docs/task.md
+++ b/docs/task.md
@@ -38,12 +38,12 @@ execution will be stored in the TSA Cache after the task is completed.
 
 ### Task Executor Configuration
 
-There are two environment variables that control the level of concurrency
-of the task executor.
+There are three environment variables that control the behavior of the task executor.
 
 ```shell
 EXECUTOR_WORKERS="5"
 EXECUTOR_POLL_INTERVAL="1s"
+EXECUTOR_MAX_TASK_RETRIES="10"
 ```
 
 Poll interval specifies how often the executor will try to fetch a *pending* task
@@ -55,6 +55,10 @@ If there are multiple instances (pods) of the service, multiply by their number
 If this is not enough, the poll interval can be decreased, or we can slightly modify
 the polling function to fetch many tasks at once (and also increase the number of workers).
 
+Maximum task retries specifies how many failed attempts to execute a single task are going
+to be made by workers before the task is removed from the queue. In the example above workers are going to
+execute a task 10 times and fail before the task is removed.
+
 To learn more about the queue and why we use database as queue see [queue](queue.md)
 
 ### Task Storage
diff --git a/internal/config/config.go b/internal/config/config.go
index 42b6cce..a4127c5 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -32,8 +32,9 @@ type policyConfig struct {
 }
 
 type executorConfig struct {
-	Workers      int           `envconfig:"EXECUTOR_WORKERS" default:"5"`
-	PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"`
+	Workers        int           `envconfig:"EXECUTOR_WORKERS" default:"5"`
+	PollInterval   time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"`
+	MaxTaskRetries int           `envconfig:"EXECUTOR_MAX_TASK_RETRIES" default:"10"`
 }
 
 type listExecutorConfig struct {
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index c59bc74..eb3f82e 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -34,12 +34,13 @@ type Cache interface {
 }
 
 type Executor struct {
-	queue        Queue
-	policy       Policy
-	storage      Storage
-	cache        Cache
-	workers      int
-	pollInterval time.Duration
+	queue          Queue
+	policy         Policy
+	storage        Storage
+	cache          Cache
+	workers        int
+	pollInterval   time.Duration
+	maxTaskRetries int
 
 	httpClient *http.Client
 	logger     *zap.Logger
@@ -52,18 +53,20 @@ func New(
 	cache Cache,
 	workers int,
 	pollInterval time.Duration,
+	maxTaskRetries int,
 	httpClient *http.Client,
 	logger *zap.Logger,
 ) *Executor {
 	return &Executor{
-		queue:        queue,
-		policy:       policy,
-		storage:      storage,
-		cache:        cache,
-		workers:      workers,
-		pollInterval: pollInterval,
-		httpClient:   httpClient,
-		logger:       logger,
+		queue:          queue,
+		policy:         policy,
+		storage:        storage,
+		cache:          cache,
+		workers:        workers,
+		pollInterval:   pollInterval,
+		maxTaskRetries: maxTaskRetries,
+		httpClient:     httpClient,
+		logger:         logger,
 	}
 }
 
@@ -76,7 +79,7 @@ func (e *Executor) Start(ctx context.Context) error {
 		wg.Add(1)
 		go func() {
 			defer wg.Done()
-			worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.httpClient, e.logger)
+			worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.maxTaskRetries, e.httpClient, e.logger)
 			worker.Start(ctx)
 		}()
 	}
diff --git a/internal/executor/worker.go b/internal/executor/worker.go
index aea032d..1f30bf1 100644
--- a/internal/executor/worker.go
+++ b/internal/executor/worker.go
@@ -14,13 +14,14 @@ import (
 )
 
 type Worker struct {
-	tasks      chan *taskpkg.Task
-	queue      Queue
-	policy     Policy
-	storage    Storage
-	cache      Cache
-	httpClient *http.Client
-	logger     *zap.Logger
+	tasks          chan *taskpkg.Task
+	queue          Queue
+	policy         Policy
+	storage        Storage
+	cache          Cache
+	maxTaskRetries int
+	httpClient     *http.Client
+	logger         *zap.Logger
 }
 
 func newWorker(
@@ -29,17 +30,19 @@ func newWorker(
 	policy Policy,
 	storage Storage,
 	cache Cache,
+	maxTaskRetries int,
 	httpClient *http.Client,
 	logger *zap.Logger,
 ) *Worker {
 	return &Worker{
-		tasks:      tasks,
-		queue:      queue,
-		policy:     policy,
-		storage:    storage,
-		cache:      cache,
-		httpClient: httpClient,
-		logger:     logger,
+		tasks:          tasks,
+		queue:          queue,
+		policy:         policy,
+		storage:        storage,
+		cache:          cache,
+		maxTaskRetries: maxTaskRetries,
+		httpClient:     httpClient,
+		logger:         logger,
 	}
 }
 
@@ -56,6 +59,15 @@ func (w *Worker) Start(ctx context.Context) {
 				zap.String("taskName", t.Name),
 			)
 
+			if t.Retries >= w.maxTaskRetries {
+				if err := w.queue.Ack(ctx, t); err != nil {
+					logger.Error("failed to ack task in queue", zap.Error(err))
+				} else {
+					logger.Error("task removed from queue due to too many failed executions")
+				}
+				continue
+			}
+
 			executed, err := w.Execute(ctx, t)
 			if err != nil {
 				logger.Error("error executing task", zap.Error(err))
diff --git a/internal/listexecutor/listexecutor.go b/internal/listexecutor/listexecutor.go
index 29615ad..6deb9e9 100644
--- a/internal/listexecutor/listexecutor.go
+++ b/internal/listexecutor/listexecutor.go
@@ -139,6 +139,11 @@ func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) {
 			list.State = taskpkg.Failed
 		}
 		state.Groups = append(state.Groups, groupState)
+
+		//mark taskList as `Failed` if the group's state is `Failed`
+		if *groupState.Status == taskpkg.Failed {
+			list.State = taskpkg.Failed
+		}
 	}
 
 	if list.State != taskpkg.Failed {
diff --git a/internal/service/task/task.go b/internal/service/task/task.go
index f1d4db4..8847b7d 100644
--- a/internal/service/task/task.go
+++ b/internal/service/task/task.go
@@ -39,6 +39,7 @@ type Task struct {
 	FinalPolicy    string    `json:"finalPolicy"`    // FinalPolicy to be executed on the task response.
 	CacheNamespace string    `json:"cacheNamespace"` // CacheNamespace if set, is used for constructing cache key.
 	CacheScope     string    `json:"cacheScope"`     // CacheScope if set, is used for constructing cache key.
+	Retries        int       `json:"retries"`        // Retries is the number of failed attempts to execute this task
 	CreatedAt      time.Time `json:"createdAt"`      // CreatedAt specifies task creation time.
 	StartedAt      time.Time `json:"startedAt"`      // StartedAt specifies task execution start time.
 	FinishedAt     time.Time `json:"finishedAt"`     // FinishedAt specifies the time when the task is done.
diff --git a/internal/storage/storage.go b/internal/storage/storage.go
index 4a8d1ff..520348f 100644
--- a/internal/storage/storage.go
+++ b/internal/storage/storage.go
@@ -113,7 +113,7 @@ func (s *Storage) Ack(ctx context.Context, task *task.Task) error {
 // it can be retrieved for processing again.
 func (s *Storage) Unack(ctx context.Context, t *task.Task) error {
 	filter := bson.M{"id": t.ID}
-	update := bson.M{"$set": bson.M{"state": task.Created}}
+	update := bson.M{"$set": bson.M{"state": task.Created, "retries": t.Retries + 1}}
 	_, err := s.tasks.UpdateOne(ctx, filter, update)
 	return err
 }
-- 
GitLab