Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package executor
import (
"context"
"net/http"
"sync"
"time"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/golib/errors"
"code.vereign.com/gaiax/tsa/task/internal/service/task"
)
type Policy interface {
Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error)
}
type Queue interface {
Poll(ctx context.Context) (*task.Task, error)
Ack(ctx context.Context, task *task.Task) error
Unack(ctx context.Context, task *task.Task) error
}
type Executor struct {
queue Queue
policy Policy
workers int
pollInterval time.Duration
httpClient *http.Client
logger *zap.Logger
}
func New(
queue Queue,
policy Policy,
workers int,
pollInterval time.Duration,
httpClient *http.Client,
logger *zap.Logger,
) *Executor {
return &Executor{
queue: queue,
policy: policy,
workers: workers,
pollInterval: pollInterval,
httpClient: httpClient,
logger: logger,
}
}
func (e *Executor) Start(ctx context.Context) error {
defer e.logger.Info("task executor stopped")
var wg sync.WaitGroup
tasks := make(chan *task.Task)
for i := 0; i < e.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
worker := newWorker(tasks, e.queue, e.policy, e.httpClient, e.logger)
worker.Start(ctx)
}()
}
loop:
for {
select {
case <-ctx.Done():
break loop
case <-time.After(e.pollInterval):
t, err := e.queue.Poll(ctx)
if err != nil {
if !errors.Is(errors.NotFound, err) {
e.logger.Error("error getting task from queue", zap.Error(err))
}
continue
}
tasks <- t // send task to the workers for execution
}
}
wg.Wait() // wait all workers to stop
return ctx.Err()
}