Newer
Older
package executor
import (
"context"
"net/http"
"sync"
"time"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/golib/errors"
"code.vereign.com/gaiax/tsa/task/internal/service/task"
)
type Policy interface {
Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error)
}
// Queue allows retrieving, returning and deleting tasks from storage.
type Queue interface {
Poll(ctx context.Context) (*task.Task, error)
Ack(ctx context.Context, task *task.Task) error
Unack(ctx context.Context, task *task.Task) error
}

Lyuben Penkovski
committed
type Storage interface {
SaveTaskHistory(ctx context.Context, task *task.Task) error
}
type Cache interface {
Set(ctx context.Context, key, namespace, scope string, value []byte) error
Get(ctx context.Context, key, namespace, scope string) ([]byte, error)
}
queue Queue
policy Policy
storage Storage
cache Cache
workers int
pollInterval time.Duration
maxTaskRetries int
func New(
queue Queue,
policy Policy,

Lyuben Penkovski
committed
storage Storage,
workers int,
pollInterval time.Duration,
httpClient *http.Client,
logger *zap.Logger,
) *Executor {
queue: queue,
policy: policy,
storage: storage,
cache: cache,
workers: workers,
pollInterval: pollInterval,
maxTaskRetries: maxTaskRetries,
httpClient: httpClient,
logger: logger,
func (e *Executor) Start(ctx context.Context) error {
defer e.logger.Info("task executor stopped")
for i := 0; i < e.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.maxTaskRetries, e.httpClient, e.logger)
if err != nil {
if !errors.Is(errors.NotFound, err) {
e.logger.Error("error getting task from queue", zap.Error(err))
}
continue
}
tasks <- t // send task to the workers for execution