Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package executor
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/golib/errors"
taskpkg "code.vereign.com/gaiax/tsa/task/internal/service/task"
)
type Worker struct {
tasks chan *taskpkg.Task
queue Queue
policy Policy
httpClient *http.Client
logger *zap.Logger
}
func newWorker(tasks chan *taskpkg.Task, queue Queue, policy Policy, httpClient *http.Client, logger *zap.Logger) *Worker {
return &Worker{
tasks: tasks,
queue: queue,
policy: policy,
httpClient: httpClient,
logger: logger,
}
}
func (w *Worker) Start(ctx context.Context) {
defer w.logger.Debug("task worker stopped")
for {
select {
case <-ctx.Done():
return
case t := <-w.tasks:
logger := w.logger.With(
zap.String("taskID", t.ID),
zap.String("taskName", t.Name),
)
executed, err := w.Execute(ctx, t)
if err != nil {
logger.Error("error executing task", zap.Error(err))
if err := w.queue.Unack(ctx, t); err != nil {
logger.Error("failed to unack task in queue", zap.Error(err))
}
continue
}
if err := w.saveTaskHistory(ctx, executed); err != nil {
logger.Error("error saving task history", zap.Error(err))
continue
}
// remove task from queue
if err := w.queue.Ack(ctx, executed); err != nil {
logger.Error("failed to ack task in queue", zap.Error(err))
}
}
}
}
func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task, error) {
task.StartedAt = time.Now()
// evaluate request policy
if task.RequestPolicy != "" {
resp, err := w.policy.Evaluate(ctx, task.RequestPolicy, task.Request)
if err != nil {
return nil, errors.New("error evaluating request policy", err)
}
// overwrite task request with the one returned from the policy
task.Request = resp
}
status, response, err := w.do(ctx, task)
if err != nil {
return nil, err
}
task.State = taskpkg.Done
task.ResponseCode = status
task.Response = response
// evaluate response policy
if task.ResponsePolicy != "" {
resp, err := w.policy.Evaluate(ctx, task.ResponsePolicy, task.Response)
if err != nil {
return nil, errors.New("error evaluating response policy", err)
}
// overwrite task response with the one returned from the policy
task.Response = resp
}
// evaluate finalizer policy
if task.FinalPolicy != "" {
if _, err := w.policy.Evaluate(ctx, task.FinalPolicy, task.Response); err != nil {
return nil, errors.New("error evaluating final policy", err)
}
}
task.FinishedAt = time.Now()
return task, nil
}
func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) {
req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request))
if err != nil {
return 0, nil, errors.New("error creating http request", err)
}
resp, err := w.httpClient.Do(req.WithContext(ctx))
if err != nil {
return 0, nil, errors.New("error executing http request", err)
}
response, err = io.ReadAll(resp.Body)
if err != nil {
return 0, nil, errors.New("error reading response body", err)
}
return resp.StatusCode, response, nil
}
func (w *Worker) saveTaskHistory(ctx context.Context, task *taskpkg.Task) error {
return fmt.Errorf("not implemented")
}