Newer
Older
package executor
import (
"context"
"net/http"
"sync"
"time"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/golib/errors"
"code.vereign.com/gaiax/tsa/task/internal/service/task"
)
type Policy interface {
Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error)
}
// Queue allows retrieving, returning and deleting tasks from storage.
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
type Queue interface {
Poll(ctx context.Context) (*task.Task, error)
Ack(ctx context.Context, task *task.Task) error
Unack(ctx context.Context, task *task.Task) error
}
type Executor struct {
queue Queue
policy Policy
workers int
pollInterval time.Duration
httpClient *http.Client
logger *zap.Logger
}
func New(
queue Queue,
policy Policy,
workers int,
pollInterval time.Duration,
httpClient *http.Client,
logger *zap.Logger,
) *Executor {
return &Executor{
queue: queue,
policy: policy,
workers: workers,
pollInterval: pollInterval,
httpClient: httpClient,
logger: logger,
}
}
func (e *Executor) Start(ctx context.Context) error {
defer e.logger.Info("task executor stopped")
var wg sync.WaitGroup
tasks := make(chan *task.Task)
for i := 0; i < e.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
worker := newWorker(tasks, e.queue, e.policy, e.httpClient, e.logger)
worker.Start(ctx)
}()
}
loop:
for {
select {
case <-ctx.Done():
break loop
case <-time.After(e.pollInterval):
t, err := e.queue.Poll(ctx)
if err != nil {
if !errors.Is(errors.NotFound, err) {
e.logger.Error("error getting task from queue", zap.Error(err))
}
continue
}
tasks <- t // send task to the workers for execution
}
}
wg.Wait() // wait all workers to stop
return ctx.Err()
}