Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
//package executor
//
//import (
// "bytes"
// "context"
// "io"
// "net/http"
// "sync"
// "time"
//
// "go.uber.org/zap"
//
// "code.vereign.com/gaiax/tsa/golib/errors"
// "code.vereign.com/gaiax/tsa/task/internal/service/task"
//)
//
//const workers = 5
//
//type Queue interface {
// Poll(ctx context.Context) (*task.Task, error)
// Ack(ctx context.Context, task *task.Task) error
// Unack(ctx context.Context, task *task.Task) error
//}
//
//type Executor struct {
// queue Queue
// workers int
// pollInterval time.Duration
//
// httpClient *http.Client
// logger *zap.Logger
//}
//
//func New() *Executor {
// return &Executor{
// httpClient: http.DefaultClient,
// }
//}
//
//func (e *Executor) Start(ctx context.Context) {
// if e.workers == 0 {
// e.workers = workers
// }
//
// var wg sync.WaitGroup
// for i := 0; i < e.workers; i++ {
// wg.Add(1)
// go func() {
// defer wg.Done()
// e.Worker(ctx)
// }()
// }
//
// <-ctx.Done()
// wg.Wait()
//
// e.logger.Info("task executor stopped")
//}
//
//func (e *Executor) Worker(ctx context.Context) {
// for {
// select {
// case <-ctx.Done():
// e.logger.Debug("task execution worker stopped")
// return
// case <-time.After(e.pollInterval):
// task, err := e.queue.Poll(ctx)
// if err != nil {
// if !errors.Is(errors.NotFound, err) {
// e.logger.Error("error getting task from queue", zap.Error(err))
// }
// continue
// }
//
// logger := e.logger.With(
// zap.String("taskID", task.ID),
// zap.String("taskName", task.Name),
// )
//
// executedTask, err := e.Execute(ctx, task)
// if err != nil {
// logger.Error("error executing task", zap.Error(err))
// if err := e.queue.Unack(ctx, task); err != nil {
// logger.Error("failed to unack task in queue", zap.Error(err))
// }
// continue
// }
//
// if len(executedTask.CacheKey) > 0 {
// if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil {
// // TODO(penkovski): should task execution repeat if result cannot be cached?
// logger.Error("failed to write task result in cache", zap.Error(err))
// }
// }
//
// if err := e.saveTaskHistory(ctx, executedTask); err != nil {
// logger.Error("failed to save task in history", zap.Error(err))
// }
// }
// }
//}
//
//func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) {
// // TODO: evaluate request policy
// t.StartedAt = time.Now()
// status, response, err := e.execute(ctx, t)
// if err != nil {
// return nil, err
// }
//
// t.State = task.Done
// t.ResponseCode = status
// t.Response = response
//
// if t.ResponsePolicy != "" {
// // TODO: evaluate response policy
// }
//
// if t.FinalPolicy != "" {
// // TODO: evaluate finalizer policy
// }
//
// t.FinishedAt = time.Now()
// return t, nil
//}
//
//func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) {
// req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request))
// if err != nil {
// return 0, nil, errors.New("error creating http request", err)
// }
//
// resp, err := e.httpClient.Do(req)
// if err != nil {
// return 0, nil, errors.New("error executing http request", err)
// }
// defer resp.Body.Close()
//
// response, err = io.ReadAll(resp.Body)
// if err != nil {
// return 0, nil, errors.New("error reading response body", err)
// }
//
// return resp.StatusCode, response, nil
//}
//
//func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error {
// return errors.New("not implemented")
//}
//
//func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error {
// return errors.New("not implemented")
//}