Newer
Older
package executor
import (
"bytes"
"context"
"io"
"net/http"
"time"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/golib/errors"
taskpkg "code.vereign.com/gaiax/tsa/task/internal/service/task"
)
type Worker struct {
tasks chan *taskpkg.Task
queue Queue
policy Policy

Lyuben Penkovski
committed
storage Storage
httpClient *http.Client
logger *zap.Logger
}
func newWorker(
tasks chan *taskpkg.Task,
queue Queue,
policy Policy,
storage Storage,
cache Cache,
httpClient *http.Client,
logger *zap.Logger,
) *Worker {
return &Worker{
tasks: tasks,
queue: queue,
policy: policy,

Lyuben Penkovski
committed
storage: storage,
httpClient: httpClient,
logger: logger,
}
}
func (w *Worker) Start(ctx context.Context) {
defer w.logger.Debug("task worker stopped")
for {
select {
case <-ctx.Done():
return
case t := <-w.tasks:
logger := w.logger.With(
zap.String("taskID", t.ID),
zap.String("taskName", t.Name),
)
executed, err := w.Execute(ctx, t)
if err != nil {
logger.Error("error executing task", zap.Error(err))
if err := w.queue.Unack(ctx, t); err != nil {
logger.Error("failed to unack task in queue", zap.Error(err))
}
continue
}
if err := w.cache.Set(
ctx,
executed.ID,
executed.CacheNamespace,
executed.CacheScope,
executed.Response,
); err != nil {
logger.Error("error storing task result in cache", zap.Error(err))
if err := w.queue.Unack(ctx, t); err != nil {
logger.Error("failed to unack task in queue", zap.Error(err))
}
continue
}

Lyuben Penkovski
committed
if err := w.storage.SaveTaskHistory(ctx, executed); err != nil {
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
logger.Error("error saving task history", zap.Error(err))
continue
}
// remove task from queue
if err := w.queue.Ack(ctx, executed); err != nil {
logger.Error("failed to ack task in queue", zap.Error(err))
}
}
}
}
func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task, error) {
task.StartedAt = time.Now()
// evaluate request policy
if task.RequestPolicy != "" {
resp, err := w.policy.Evaluate(ctx, task.RequestPolicy, task.Request)
if err != nil {
return nil, errors.New("error evaluating request policy", err)
}
// overwrite task request with the one returned from the policy
task.Request = resp
}
status, response, err := w.do(ctx, task)
if err != nil {
return nil, err
}
task.State = taskpkg.Done
task.ResponseCode = status
task.Response = response
// evaluate response policy
if task.ResponsePolicy != "" {
resp, err := w.policy.Evaluate(ctx, task.ResponsePolicy, task.Response)
if err != nil {
return nil, errors.New("error evaluating response policy", err)
}
// overwrite task response with the one returned from the policy
task.Response = resp
}
// evaluate finalizer policy
if task.FinalPolicy != "" {
resp, err := w.policy.Evaluate(ctx, task.FinalPolicy, task.Response)
if err != nil {
return nil, errors.New("error evaluating final policy", err)
}
// overwrite task response with the one returned from the policy
task.Response = resp
}
task.FinishedAt = time.Now()
return task, nil
}
func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) {
req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request))
if err != nil {
return 0, nil, errors.New("error creating http request", err)
}
resp, err := w.httpClient.Do(req.WithContext(ctx))
if err != nil {
return 0, nil, errors.New("error executing http request", err)
}
response, err = io.ReadAll(resp.Body)
if err != nil {
return 0, nil, errors.New("error reading response body", err)
}
return resp.StatusCode, response, nil
}