diff --git a/cmd/task/main.go b/cmd/task/main.go index 7e5d905c8a1eea722d50cc626bbf7a989b454b1e..d7853699eacdd6406531057c77fe0b3c8670c9c7 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -30,6 +30,7 @@ import ( "code.vereign.com/gaiax/tsa/task/internal/clients/policy" "code.vereign.com/gaiax/tsa/task/internal/config" "code.vereign.com/gaiax/tsa/task/internal/executor" + "code.vereign.com/gaiax/tsa/task/internal/listexecutor" "code.vereign.com/gaiax/tsa/task/internal/service" "code.vereign.com/gaiax/tsa/task/internal/service/health" "code.vereign.com/gaiax/tsa/task/internal/service/task" @@ -87,6 +88,17 @@ func main() { logger, ) + listExecutor := listexecutor.New( + storage, + policy, + storage, + cache, + cfg.ListExecutor.Workers, + cfg.ListExecutor.PollInterval, + httpClient(), + logger, + ) + // create services var ( taskSvc goatask.Service @@ -169,6 +181,9 @@ func main() { g.Go(func() error { return executor.Start(ctx) }) + g.Go(func() error { + return listExecutor.Start(ctx) + }) if err := g.Wait(); err != nil { logger.Error("run group stopped", zap.Error(err)) } diff --git a/internal/config/config.go b/internal/config/config.go index f68e90ba1d3c58f86e468b9b72b4e60e791d6d91..582623bfd107cbcaf889edf5a68b900f1e4e5e6f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,11 +3,12 @@ package config import "time" type Config struct { - HTTP httpConfig - Mongo mongoConfig - Policy policyConfig - Executor executorConfig - Cache cacheConfig + HTTP httpConfig + Mongo mongoConfig + Policy policyConfig + Executor executorConfig + ListExecutor listExecutorConfig + Cache cacheConfig LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` } @@ -35,6 +36,11 @@ type executorConfig struct { PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` } +type listExecutorConfig struct { + Workers int `envconfig:"LIST_EXECUTOR_WORKERS" default:"5"` + PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` +} + type cacheConfig struct { Addr string `envconfig:"CACHE_ADDR" required:"true"` } diff --git a/internal/listexecutor/listexecutor.go b/internal/listexecutor/listexecutor.go new file mode 100644 index 0000000000000000000000000000000000000000..cff312d11a13201a2ebb2cad50ef080a1180e164 --- /dev/null +++ b/internal/listexecutor/listexecutor.go @@ -0,0 +1,299 @@ +package listexecutor + +import ( + "bytes" + "context" + "io" + "net/http" + "time" + + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + taskpkg "code.vereign.com/gaiax/tsa/task/internal/service/task" + "code.vereign.com/gaiax/tsa/task/internal/service/tasklist" +) + +type token struct{} + +const ( + sequential = "sequential" + parallel = "parallel" +) + +// Policy client. +type Policy interface { + Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) +} + +// Queue allows retrieving, returning and deleting taskLists and group tasks from storage. +type Queue interface { + PollList(ctx context.Context) (*tasklist.TaskList, error) + AckList(ctx context.Context, taskList *tasklist.TaskList) error + AckGroupTasks(ctx context.Context, group *tasklist.Group) error +} + +type Storage interface { + GetGroupTasks(ctx context.Context, group *tasklist.Group) ([]*taskpkg.Task, error) + SaveTaskHistory(ctx context.Context, task *taskpkg.Task) error + SaveTaskListHistory(ctx context.Context, task *tasklist.TaskList) error +} + +type Cache interface { + Set(ctx context.Context, key, namespace, scope string, value []byte) error + Get(ctx context.Context, key, namespace, scope string) ([]byte, error) +} + +type ListExecutor struct { + queue Queue + policy Policy + storage Storage + cache Cache + workers int + pollInterval time.Duration + + httpClient *http.Client + logger *zap.Logger +} + +func New( + queue Queue, + policy Policy, + storage Storage, + cache Cache, + workers int, + pollInterval time.Duration, + httpClient *http.Client, + logger *zap.Logger, +) *ListExecutor { + return &ListExecutor{ + queue: queue, + policy: policy, + storage: storage, + cache: cache, + workers: workers, + pollInterval: pollInterval, + httpClient: httpClient, + logger: logger, + } +} + +func (l *ListExecutor) Start(ctx context.Context) error { + defer l.logger.Info("taskList executor stopped") + + // buffered channel used as a semaphore to limit concurrent executions + sem := make(chan token, l.workers) + +loop: + for { + select { + case <-ctx.Done(): + break loop + case <-time.After(l.pollInterval): + taskList, err := l.queue.PollList(ctx) + if err != nil { + if !errors.Is(errors.NotFound, err) { + l.logger.Error("error getting taskList from queue", zap.Error(err)) + } + continue + } + + sem <- token{} // acquire a semaphore + go func(list *tasklist.TaskList) { + l.Execute(ctx, list) + <-sem // release the semaphore + }(taskList) + } + } + + // wait for completion + for n := l.workers; n > 0; n-- { + sem <- token{} + } + + return ctx.Err() +} + +func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) { + logger := l.logger.With( + zap.String("taskListID", list.ID), + zap.String("taskListName", list.Name), + ) + list.State = taskpkg.Pending + list.StartTime = time.Now() + + // execute groups sequentially + for i, group := range list.Groups { + if err := l.executeGroup(ctx, &group); err != nil { + logger.Error("error executing group", zap.Error(err)) + group.State = taskpkg.Failed + list.State = taskpkg.Failed + } + list.Groups[i] = group + } + + if list.State != taskpkg.Failed { + list.State = taskpkg.Done + } + list.FinishTime = time.Now() + + if err := l.storage.SaveTaskListHistory(ctx, list); err != nil { + logger.Error("error saving taskList history", zap.Error(err)) + } else { + logger.Debug("taskList history is saved") + } + + if err := l.queue.AckList(ctx, list); err != nil { + logger.Error("failed to ack taskList in queue", zap.Error(err)) + } +} + +func (l *ListExecutor) executeGroup(ctx context.Context, group *tasklist.Group) error { + switch exec := group.Execution; exec { + case sequential: + return l.executeSequential(ctx, group) + case parallel: + return errors.New("not implemented") + } + + return errors.New("unknown type of group execution") +} + +func (l *ListExecutor) executeSequential(ctx context.Context, group *tasklist.Group) error { + group.State = taskpkg.Pending + + tasks, err := l.storage.GetGroupTasks(ctx, group) + if err != nil { + return err + } + + req := group.Request + for _, task := range tasks { + logger := l.logger.With( + zap.String("taskID", task.ID), + zap.String("taskName", task.Name), + ) + + // mark all subsequent tasks as failed if one task already failed + if group.State == taskpkg.Failed { + task.State = taskpkg.Failed + continue + } + + task.Request = req + err := l.executeTask(ctx, task) + if err != nil { + task.State = taskpkg.Failed + group.State = taskpkg.Failed + logger.Error("error executing task", zap.Error(err)) + continue + } + logger.Debug("task execution completed successfully") + + // pass the response from current task to the next task + req = task.Response + if bytes.Compare(task.Response, []byte(nil)) == 0 { + req = []byte{} + } + + if err := l.cache.Set( + ctx, + task.ID, + task.CacheNamespace, + task.CacheScope, + task.Response, + ); err != nil { + logger.Error("error storing task result in cache", zap.Error(err)) + continue + } + logger.Debug("task results are stored in cache") + + if err := l.storage.SaveTaskHistory(ctx, task); err != nil { + logger.Error("error saving task history", zap.Error(err)) + continue + } + logger.Debug("task history is saved") + } + + // remove tasks from queue + if err := l.queue.AckGroupTasks(ctx, group); err != nil { + l.logger.With(zap.String("groupID", group.ID)).Error("failed to ack group tasks in queue", zap.Error(err)) + } + + if group.State != taskpkg.Failed { + group.State = taskpkg.Done + } + + return nil +} + +func (l *ListExecutor) executeTask(ctx context.Context, task *taskpkg.Task) error { + task.StartedAt = time.Now() + + var response []byte + var err error + + // task is executing a request policy OR an HTTP call to predefined URL + if task.RequestPolicy != "" { + response, err = l.policy.Evaluate(ctx, task.RequestPolicy, task.Request) + if err != nil { + return errors.New("error evaluating request policy", err) + } + task.ResponseCode = http.StatusOK + } else if task.URL != "" && task.Method != "" { + var status int + status, response, err = l.doHTTPTask(ctx, task) + if err != nil { + return err + } + task.ResponseCode = status + } else { + return errors.New(errors.Internal, "invalid task: must define either request policy or url") + } + + task.Response = response + + // evaluate response policy + if task.ResponsePolicy != "" { + resp, err := l.policy.Evaluate(ctx, task.ResponsePolicy, task.Response) + if err != nil { + return errors.New("error evaluating response policy", err) + } + // overwrite task response with the one returned from the policy + task.Response = resp + } + + // evaluate finalizer policy + if task.FinalPolicy != "" { + resp, err := l.policy.Evaluate(ctx, task.FinalPolicy, task.Response) + if err != nil { + return errors.New("error evaluating final policy", err) + } + // overwrite task response with the one returned from the policy + task.Response = resp + } + + task.State = taskpkg.Done + task.FinishedAt = time.Now() + return nil +} + +func (l *ListExecutor) doHTTPTask(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) { + req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request)) + if err != nil { + return 0, nil, errors.New("error creating http request", err) + } + + resp, err := l.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return 0, nil, errors.New("error executing http request", err) + } + defer resp.Body.Close() // nolint:errcheck + + response, err = io.ReadAll(resp.Body) + if err != nil { + return 0, nil, errors.New("error reading response body", err) + } + + return resp.StatusCode, response, nil +} diff --git a/internal/service/task/service.go b/internal/service/task/service.go index c999eace7b3237e631178439158c250bb1b995a5..a7dde0cf2c011b9bd52c2bbc4c5996fbdbaa3f56 100644 --- a/internal/service/task/service.go +++ b/internal/service/task/service.go @@ -121,7 +121,7 @@ func (s *Service) TaskResult(ctx context.Context, req *goatask.TaskResultRequest } } - if task.State != Done { + if task.State != Done && task.State != Failed { return nil, errors.New(errors.NotFound, "no result, task is not completed") } diff --git a/internal/service/task/task.go b/internal/service/task/task.go index e164797533886ed5536fd10de231b92a0f3a0185..f1d4db423724579cb1a3830488d2c0984e25c11d 100644 --- a/internal/service/task/task.go +++ b/internal/service/task/task.go @@ -18,6 +18,10 @@ const ( // Done state is when the task is completed. // TODO(penkovski): do we need this state if task is deleted after it is done? Done = "done" + + // Failed state is when the task execution failed but the task is part + // of a group and later execution is not possible, so it could not be "Unack"-ed + Failed = "failed" ) type Task struct { diff --git a/internal/service/tasklist/service.go b/internal/service/tasklist/service.go index 51140eef53041def68512d8d8e61946a118662c5..2aec2fcfe1c20a6fd2ebfafe3d48d26b1ecfb89a 100644 --- a/internal/service/tasklist/service.go +++ b/internal/service/tasklist/service.go @@ -72,7 +72,7 @@ func (s *Service) Create(ctx context.Context, req *goatasklist.CreateTaskListReq taskList := &TaskList{ ID: uuid.NewString(), - Groups: createGroups(template), + Groups: createGroups(template, taskListRequest), Name: template.Name, Request: taskListRequest, CacheScope: template.CacheScope, @@ -105,7 +105,7 @@ func (s *Service) Create(ctx context.Context, req *goatasklist.CreateTaskListReq }, nil } -func createGroups(t *Template) []Group { +func createGroups(t *Template, req []byte) []Group { var groups []Group for _, group := range t.Groups { g := Group{ @@ -114,6 +114,7 @@ func createGroups(t *Template) []Group { Tasks: group.Tasks, State: task.Created, Metadata: group.Metadata, + Request: req, FinalPolicy: group.FinalPolicy, } groups = append(groups, g) diff --git a/internal/service/tasklist/task_list.go b/internal/service/tasklist/task_list.go index 0e3e7f542972d3c531e55225f5a8a39f4dc2c851..c01cd1be6a50ed8552e6026fda82f222bfee181b 100644 --- a/internal/service/tasklist/task_list.go +++ b/internal/service/tasklist/task_list.go @@ -38,6 +38,7 @@ type Group struct { Execution string `json:"execution"` Tasks []string `json:"tasks"` State task.State `json:"state"` + Request []byte `json:"request"` Metadata interface{} `json:"metadata"` // TODO(penkovski): not yet clear FinalPolicy string `json:"finalPolicy"` } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 4a25cf649a8f7c96650d4d478de906cd531b6b72..d76f613471b0e24f1222a40b79e07c188c0caece 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -21,6 +21,7 @@ const ( tasksHistory = "tasksHistory" taskListQueue = "taskLists" taskListTemplates = "taskListTemplates" + taskListHistory = "taskListHistory" ) type Storage struct { @@ -29,6 +30,7 @@ type Storage struct { tasksHistory *mongo.Collection taskLists *mongo.Collection taskListTemplates *mongo.Collection + taskListHistory *mongo.Collection } func New(db *mongo.Client) *Storage { @@ -38,6 +40,7 @@ func New(db *mongo.Client) *Storage { tasksHistory: db.Database(taskDB).Collection(tasksHistory), taskListTemplates: db.Database(taskDB).Collection(taskListTemplates), taskLists: db.Database(taskDB).Collection(taskListQueue), + taskListHistory: db.Database(taskDB).Collection(taskListHistory), } } @@ -238,3 +241,77 @@ func (s *Storage) AckList(ctx context.Context, taskList *tasklist.TaskList) erro _, err := s.taskLists.DeleteOne(ctx, bson.M{"id": taskList.ID}) return err } + +// PollList retrieves one taskList from the taskLists collection +// with the older ones being retrieved first (FIFO). It updates the state +// of the task to "pending", so that consequent calls to PollList would +// not retrieve the same task. +func (s *Storage) PollList(ctx context.Context) (*tasklist.TaskList, error) { + opts := options. + FindOneAndUpdate(). + SetSort(bson.M{"createdAt": 1}). + SetReturnDocument(options.After) + + filter := bson.M{"state": task.Created} + update := bson.M{"$set": bson.M{"state": task.Pending}} + result := s.taskLists.FindOneAndUpdate( + ctx, + filter, + update, + opts, + ) + + if result.Err() != nil { + if strings.Contains(result.Err().Error(), "no documents in result") { + return nil, errors.New(errors.NotFound, "taskList not found") + } + return nil, result.Err() + } + + var list tasklist.TaskList + if err := result.Decode(&list); err != nil { + return nil, err + } + + return &list, nil +} + +// GetGroupTasks fetches all tasks by a groupID +func (s *Storage) GetGroupTasks(ctx context.Context, group *tasklist.Group) ([]*task.Task, error) { + filter := bson.M{"groupid": group.ID} + opts := options.Find().SetSort(bson.M{"createdat": 1}) + + cursor, err := s.tasks.Find(ctx, filter, opts) + if err != nil { + return nil, err + } + defer cursor.Close(ctx) + + var tasks []*task.Task + for cursor.Next(ctx) { + var task task.Task + if err := cursor.Decode(&task); err != nil { + return nil, err + } + tasks = append(tasks, &task) + } + + return tasks, nil +} + +// AckGroupTasks removes tasks from tasks collection by groupID +func (s *Storage) AckGroupTasks(ctx context.Context, group *tasklist.Group) error { + _, err := s.tasks.DeleteMany(ctx, bson.M{"groupid": group.ID}) + return err +} + +// SaveTaskListHistory adds a tasklist to the taskListHistory collection +func (s *Storage) SaveTaskListHistory(ctx context.Context, taskList *tasklist.TaskList) error { + insert := func() error { + _, err := s.taskListHistory.InsertOne(ctx, taskList) + return err + } + + b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) + return backoff.Retry(insert, b) +}