package executor import ( "context" "net/http" "sync" "time" "go.uber.org/zap" "code.vereign.com/gaiax/tsa/golib/errors" "code.vereign.com/gaiax/tsa/task/internal/service/task" ) // Policy client. type Policy interface { Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) } // Queue allows retrieving, returning and deleting tasks from storage. type Queue interface { Poll(ctx context.Context) (*task.Task, error) Ack(ctx context.Context, task *task.Task) error Unack(ctx context.Context, task *task.Task) error } type Executor struct { queue Queue policy Policy workers int pollInterval time.Duration httpClient *http.Client logger *zap.Logger } func New( queue Queue, policy Policy, workers int, pollInterval time.Duration, httpClient *http.Client, logger *zap.Logger, ) *Executor { return &Executor{ queue: queue, policy: policy, workers: workers, pollInterval: pollInterval, httpClient: httpClient, logger: logger, } } func (e *Executor) Start(ctx context.Context) error { defer e.logger.Info("task executor stopped") var wg sync.WaitGroup tasks := make(chan *task.Task) for i := 0; i < e.workers; i++ { wg.Add(1) go func() { defer wg.Done() worker := newWorker(tasks, e.queue, e.policy, e.httpClient, e.logger) worker.Start(ctx) }() } loop: for { select { case <-ctx.Done(): break loop case <-time.After(e.pollInterval): t, err := e.queue.Poll(ctx) if err != nil { if !errors.Is(errors.NotFound, err) { e.logger.Error("error getting task from queue", zap.Error(err)) } continue } tasks <- t // send task to the workers for execution } } wg.Wait() // wait all workers to stop return ctx.Err() }