Newer
Older
"context"
"encoding/json"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/golib/errors"
goatasklist "code.vereign.com/gaiax/tsa/task/gen/task_list"
"code.vereign.com/gaiax/tsa/task/internal/service/task"
)
//go:generate counterfeiter . Storage
//go:generate counterfeiter . Queue
// Storage for retrieving predefined task templates.
type Storage interface {
TaskListTemplate(ctx context.Context, taskListName string) (*Template, error)
TaskTemplates(ctx context.Context, names []string) (map[string]*task.Task, error)
TaskList(ctx context.Context, taskListID string) (*TaskList, error)
TaskListHistory(ctx context.Context, taskListID string) (*TaskList, error)
GetGroupTasks(ctx context.Context, group *Group) ([]*task.Task, error)
}
type Queue interface {
AddTaskList(ctx context.Context, taskList *TaskList, tasks []*task.Task) error
}
type Cache interface {
Get(ctx context.Context, key, namespace, scope string) ([]byte, error)
}
type Service struct {
storage Storage
queue Queue
func New(template Storage, queue Queue, cache Cache, logger *zap.Logger) *Service {
return &Service{
storage: template,
queue: queue,
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
logger: logger,
}
}
// Create a taskList and corresponding tasks and put them in
// respective queues for execution.
func (s *Service) Create(ctx context.Context, req *goatasklist.CreateTaskListRequest) (*goatasklist.CreateTaskListResult, error) {
if req.TaskListName == "" {
return nil, errors.New(errors.BadRequest, "missing taskListName")
}
logger := s.logger.With(zap.String("taskListName", req.TaskListName))
// get predefined taskList definition from storage
template, err := s.storage.TaskListTemplate(ctx, req.TaskListName)
if err != nil {
logger.Error("error getting taskList template from storage", zap.Error(err))
return nil, err
}
// get predefined task definitions from storage
taskTemplates, err := s.storage.TaskTemplates(ctx, taskNamesFromTaskListTemplate(template))
if err != nil {
logger.Error("error getting task templates from storage")
return nil, err
}
taskListRequest, err := json.Marshal(req.Data)
if err != nil {
logger.Error("error marshaling request data to JSON", zap.Error(err))
return nil, errors.New(errors.BadRequest, "error marshaling request data to JSON", err)
}
taskList := &TaskList{
ID: uuid.NewString(),
Groups: createGroups(template, taskListRequest),
Name: template.Name,
Request: taskListRequest,
CacheScope: template.CacheScope,
CacheNamespace: template.CacheNamespace,
State: task.Created,
}
// if cache namespace and scope are given, use them instead of the defaults
if req.CacheNamespace != nil && *req.CacheNamespace != "" {
taskList.CacheNamespace = *req.CacheNamespace
}
if req.CacheScope != nil && *req.CacheScope != "" {
taskList.CacheScope = *req.CacheScope
}
tasks, err := createTasks(taskList, taskTemplates)
if err != nil {
logger.Error("failed to create tasks for taskList", zap.Error(err))
return nil, errors.New("failed to create tasks for taskList", err)
}
if err := s.queue.AddTaskList(ctx, taskList, tasks); err != nil {
logger.Error("error adding taskList to queue", zap.Error(err))
return nil, errors.New("error adding taskList to queue", err)
}
return &goatasklist.CreateTaskListResult{
TaskListID: taskList.ID,
}, nil
}
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// TaskListResult retrieves a taskList result containing all tasks' unique IDs
// and statuses from the Cache service.
func (s *Service) TaskListResult(ctx context.Context, req *goatasklist.TaskListResultRequest) (res *goatasklist.TaskListStatus, err error) {
if req.TaskListID == "" {
return nil, errors.New(errors.BadRequest, "missing taskListID")
}
logger := s.logger.With(zap.String("taskListID", req.TaskListID))
var list *TaskList
list, err = s.storage.TaskListHistory(ctx, req.TaskListID)
if err != nil && !errors.Is(errors.NotFound, err) {
logger.Error("error getting taskList from history collection", zap.Error(err))
return nil, err
}
if list == nil {
list, err = s.storage.TaskList(ctx, req.TaskListID)
if err != nil {
if errors.Is(errors.NotFound, err) {
return nil, errors.New("taskList is not found", err)
}
logger.Error("error getting taskList from taskLists collection", zap.Error(err))
return nil, err
}
}
var result *goatasklist.TaskListStatus
if list.State != task.Done && list.State != task.Failed {
// taskList is not executed yet
result, err = s.calculateState(ctx, list)
if err != nil {
logger.Error("error calculating taskList state", zap.Error(err))
return nil, err
}
} else {
// taskList is already executed
var value []byte
value, err = s.cache.Get(ctx, list.ID, list.CacheNamespace, list.CacheScope)
if err != nil {
logger.Error("error getting taskList result from cache", zap.Error(err))
return nil, err
}
if err := json.NewDecoder(bytes.NewReader(value)).Decode(&result); err != nil {
logger.Error("error decoding result from cache", zap.Error(err))
return nil, errors.New("error decoding result from cache", err)
}
}
return result, nil
}
func createGroups(t *Template, req []byte) []Group {
var groups []Group
for _, group := range t.Groups {
g := Group{
ID: uuid.NewString(),
Execution: group.Execution,
Tasks: group.Tasks,
State: task.Created,
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
FinalPolicy: group.FinalPolicy,
}
groups = append(groups, g)
}
return groups
}
// createTasks creates task.Task instances out of task templates
// in order to be added to queue for execution
func createTasks(t *TaskList, templates map[string]*task.Task) ([]*task.Task, error) {
var tasks []*task.Task
for _, group := range t.Groups {
for _, taskName := range group.Tasks {
template, ok := templates[taskName]
if !ok {
return nil, errors.New(errors.NotFound, "failed to find task template")
}
task := task.Task{
ID: uuid.NewString(),
GroupID: group.ID,
Name: taskName,
State: task.Created,
URL: template.URL,
Method: template.Method,
RequestPolicy: template.RequestPolicy,
ResponsePolicy: template.ResponsePolicy,
FinalPolicy: template.FinalPolicy,
CacheNamespace: template.CacheNamespace,
CacheScope: template.CacheScope,
CreatedAt: time.Now(),
}
// if cache namespace and scope are set in the taskList, use them instead of the defaults
if t.CacheNamespace != "" {
task.CacheNamespace = t.CacheNamespace
}
if t.CacheScope != "" {
task.CacheScope = t.CacheScope
}
tasks = append(tasks, &task)
}
}
return tasks, nil
}
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
func (s *Service) calculateState(ctx context.Context, list *TaskList) (*goatasklist.TaskListStatus, error) {
result := &goatasklist.TaskListStatus{
ID: list.ID,
Status: string(list.State),
}
for i := range list.Groups {
groupState := goatasklist.GroupStatus{
ID: &list.Groups[i].ID,
Status: ptr.String(string(list.Groups[i].State)),
}
tasks, err := s.storage.GetGroupTasks(ctx, &list.Groups[i])
if err != nil {
return nil, err
}
for j := range tasks {
taskState := goatasklist.TaskStatus{
ID: &tasks[j].ID,
Status: ptr.String(string(tasks[j].State)),
}
groupState.Tasks = append(groupState.Tasks, &taskState)
}
result.Groups = append(result.Groups, &groupState)
}
return result, nil
}
// taskNamesFromTaskListTemplate returns the names of all tasks within
// one taskList template
func taskNamesFromTaskListTemplate(template *Template) []string {
var names []string
for _, group := range template.Groups {