Skip to content
Snippets Groups Projects
Commit 066e96f4 authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

#13 WIP: sequential taskList executor

parent d7e78ddb
No related branches found
No related tags found
1 merge request!8Sequential Task List executor
......@@ -30,6 +30,7 @@ import (
"code.vereign.com/gaiax/tsa/task/internal/clients/policy"
"code.vereign.com/gaiax/tsa/task/internal/config"
"code.vereign.com/gaiax/tsa/task/internal/executor"
"code.vereign.com/gaiax/tsa/task/internal/listexecutor"
"code.vereign.com/gaiax/tsa/task/internal/service"
"code.vereign.com/gaiax/tsa/task/internal/service/health"
"code.vereign.com/gaiax/tsa/task/internal/service/task"
......@@ -87,6 +88,17 @@ func main() {
logger,
)
listExecutor := listexecutor.New(
storage,
policy,
storage,
cache,
cfg.ListExecutor.Workers,
cfg.ListExecutor.PollInterval,
httpClient(),
logger,
)
// create services
var (
taskSvc goatask.Service
......@@ -169,6 +181,9 @@ func main() {
g.Go(func() error {
return executor.Start(ctx)
})
g.Go(func() error {
return listExecutor.Start(ctx)
})
if err := g.Wait(); err != nil {
logger.Error("run group stopped", zap.Error(err))
}
......
......@@ -3,11 +3,12 @@ package config
import "time"
type Config struct {
HTTP httpConfig
Mongo mongoConfig
Policy policyConfig
Executor executorConfig
Cache cacheConfig
HTTP httpConfig
Mongo mongoConfig
Policy policyConfig
Executor executorConfig
ListExecutor listExecutorConfig
Cache cacheConfig
LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"`
}
......@@ -35,6 +36,11 @@ type executorConfig struct {
PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"`
}
type listExecutorConfig struct {
Workers int `envconfig:"LIST_EXECUTOR_WORKERS" default:"5"`
PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"`
}
type cacheConfig struct {
Addr string `envconfig:"CACHE_ADDR" required:"true"`
}
package listexecutor
import (
"bytes"
"context"
"io"
"net/http"
"time"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/golib/errors"
taskpkg "code.vereign.com/gaiax/tsa/task/internal/service/task"
"code.vereign.com/gaiax/tsa/task/internal/service/tasklist"
)
type token struct{}
const (
sequential = "sequential"
parallel = "parallel"
)
// Policy client.
type Policy interface {
Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error)
}
// Queue allows retrieving, returning and deleting taskLists and group tasks from storage.
type Queue interface {
PollList(ctx context.Context) (*tasklist.TaskList, error)
AckList(ctx context.Context, taskList *tasklist.TaskList) error
AckGroupTasks(ctx context.Context, group *tasklist.Group) error
}
type Storage interface {
GetGroupTasks(ctx context.Context, group *tasklist.Group) ([]*taskpkg.Task, error)
SaveTaskHistory(ctx context.Context, task *taskpkg.Task) error
SaveTaskListHistory(ctx context.Context, task *tasklist.TaskList) error
}
type Cache interface {
Set(ctx context.Context, key, namespace, scope string, value []byte) error
Get(ctx context.Context, key, namespace, scope string) ([]byte, error)
}
type ListExecutor struct {
queue Queue
policy Policy
storage Storage
cache Cache
workers int
pollInterval time.Duration
httpClient *http.Client
logger *zap.Logger
}
func New(
queue Queue,
policy Policy,
storage Storage,
cache Cache,
workers int,
pollInterval time.Duration,
httpClient *http.Client,
logger *zap.Logger,
) *ListExecutor {
return &ListExecutor{
queue: queue,
policy: policy,
storage: storage,
cache: cache,
workers: workers,
pollInterval: pollInterval,
httpClient: httpClient,
logger: logger,
}
}
func (l *ListExecutor) Start(ctx context.Context) error {
defer l.logger.Info("taskList executor stopped")
// buffered channel used as a semaphore to limit concurrent executions
sem := make(chan token, l.workers)
loop:
for {
select {
case <-ctx.Done():
break loop
case <-time.After(l.pollInterval):
taskList, err := l.queue.PollList(ctx)
if err != nil {
if !errors.Is(errors.NotFound, err) {
l.logger.Error("error getting taskList from queue", zap.Error(err))
}
continue
}
sem <- token{} // acquire a semaphore
go func(list *tasklist.TaskList) {
l.Execute(ctx, list)
<-sem // release the semaphore
}(taskList)
}
}
// wait for completion
for n := l.workers; n > 0; n-- {
sem <- token{}
}
return ctx.Err()
}
func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) {
logger := l.logger.With(
zap.String("taskListID", list.ID),
zap.String("taskListName", list.Name),
)
list.State = taskpkg.Pending
list.StartTime = time.Now()
// execute groups sequentially
for i, group := range list.Groups {
if err := l.executeGroup(ctx, &group); err != nil {
logger.Error("error executing group", zap.Error(err))
group.State = taskpkg.Failed
list.State = taskpkg.Failed
}
list.Groups[i] = group
}
if list.State != taskpkg.Failed {
list.State = taskpkg.Done
}
list.FinishTime = time.Now()
if err := l.storage.SaveTaskListHistory(ctx, list); err != nil {
logger.Error("error saving taskList history", zap.Error(err))
} else {
logger.Debug("taskList history is saved")
}
if err := l.queue.AckList(ctx, list); err != nil {
logger.Error("failed to ack taskList in queue", zap.Error(err))
}
}
func (l *ListExecutor) executeGroup(ctx context.Context, group *tasklist.Group) error {
switch exec := group.Execution; exec {
case sequential:
return l.executeSequential(ctx, group)
case parallel:
return errors.New("not implemented")
}
return errors.New("unknown type of group execution")
}
func (l *ListExecutor) executeSequential(ctx context.Context, group *tasklist.Group) error {
group.State = taskpkg.Pending
tasks, err := l.storage.GetGroupTasks(ctx, group)
if err != nil {
return err
}
req := group.Request
for _, task := range tasks {
logger := l.logger.With(
zap.String("taskID", task.ID),
zap.String("taskName", task.Name),
)
// mark all subsequent tasks as failed if one task already failed
if group.State == taskpkg.Failed {
task.State = taskpkg.Failed
continue
}
task.Request = req
err := l.executeTask(ctx, task)
if err != nil {
task.State = taskpkg.Failed
group.State = taskpkg.Failed
logger.Error("error executing task", zap.Error(err))
continue
}
logger.Debug("task execution completed successfully")
// pass the response from current task to the next task
req = task.Response
if bytes.Compare(task.Response, []byte(nil)) == 0 {
req = []byte{}
}
if err := l.cache.Set(
ctx,
task.ID,
task.CacheNamespace,
task.CacheScope,
task.Response,
); err != nil {
logger.Error("error storing task result in cache", zap.Error(err))
continue
}
logger.Debug("task results are stored in cache")
if err := l.storage.SaveTaskHistory(ctx, task); err != nil {
logger.Error("error saving task history", zap.Error(err))
continue
}
logger.Debug("task history is saved")
}
// remove tasks from queue
if err := l.queue.AckGroupTasks(ctx, group); err != nil {
l.logger.With(zap.String("groupID", group.ID)).Error("failed to ack group tasks in queue", zap.Error(err))
}
if group.State != taskpkg.Failed {
group.State = taskpkg.Done
}
return nil
}
func (l *ListExecutor) executeTask(ctx context.Context, task *taskpkg.Task) error {
task.StartedAt = time.Now()
var response []byte
var err error
// task is executing a request policy OR an HTTP call to predefined URL
if task.RequestPolicy != "" {
response, err = l.policy.Evaluate(ctx, task.RequestPolicy, task.Request)
if err != nil {
return errors.New("error evaluating request policy", err)
}
task.ResponseCode = http.StatusOK
} else if task.URL != "" && task.Method != "" {
var status int
status, response, err = l.doHTTPTask(ctx, task)
if err != nil {
return err
}
task.ResponseCode = status
} else {
return errors.New(errors.Internal, "invalid task: must define either request policy or url")
}
task.Response = response
// evaluate response policy
if task.ResponsePolicy != "" {
resp, err := l.policy.Evaluate(ctx, task.ResponsePolicy, task.Response)
if err != nil {
return errors.New("error evaluating response policy", err)
}
// overwrite task response with the one returned from the policy
task.Response = resp
}
// evaluate finalizer policy
if task.FinalPolicy != "" {
resp, err := l.policy.Evaluate(ctx, task.FinalPolicy, task.Response)
if err != nil {
return errors.New("error evaluating final policy", err)
}
// overwrite task response with the one returned from the policy
task.Response = resp
}
task.State = taskpkg.Done
task.FinishedAt = time.Now()
return nil
}
func (l *ListExecutor) doHTTPTask(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) {
req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request))
if err != nil {
return 0, nil, errors.New("error creating http request", err)
}
resp, err := l.httpClient.Do(req.WithContext(ctx))
if err != nil {
return 0, nil, errors.New("error executing http request", err)
}
defer resp.Body.Close() // nolint:errcheck
response, err = io.ReadAll(resp.Body)
if err != nil {
return 0, nil, errors.New("error reading response body", err)
}
return resp.StatusCode, response, nil
}
......@@ -121,7 +121,7 @@ func (s *Service) TaskResult(ctx context.Context, req *goatask.TaskResultRequest
}
}
if task.State != Done {
if task.State != Done && task.State != Failed {
return nil, errors.New(errors.NotFound, "no result, task is not completed")
}
......
......@@ -18,6 +18,10 @@ const (
// Done state is when the task is completed.
// TODO(penkovski): do we need this state if task is deleted after it is done?
Done = "done"
// Failed state is when the task execution failed but the task is part
// of a group and later execution is not possible, so it could not be "Unack"-ed
Failed = "failed"
)
type Task struct {
......
......@@ -72,7 +72,7 @@ func (s *Service) Create(ctx context.Context, req *goatasklist.CreateTaskListReq
taskList := &TaskList{
ID: uuid.NewString(),
Groups: createGroups(template),
Groups: createGroups(template, taskListRequest),
Name: template.Name,
Request: taskListRequest,
CacheScope: template.CacheScope,
......@@ -105,7 +105,7 @@ func (s *Service) Create(ctx context.Context, req *goatasklist.CreateTaskListReq
}, nil
}
func createGroups(t *Template) []Group {
func createGroups(t *Template, req []byte) []Group {
var groups []Group
for _, group := range t.Groups {
g := Group{
......@@ -114,6 +114,7 @@ func createGroups(t *Template) []Group {
Tasks: group.Tasks,
State: task.Created,
Metadata: group.Metadata,
Request: req,
FinalPolicy: group.FinalPolicy,
}
groups = append(groups, g)
......
......@@ -38,6 +38,7 @@ type Group struct {
Execution string `json:"execution"`
Tasks []string `json:"tasks"`
State task.State `json:"state"`
Request []byte `json:"request"`
Metadata interface{} `json:"metadata"` // TODO(penkovski): not yet clear
FinalPolicy string `json:"finalPolicy"`
}
......@@ -21,6 +21,7 @@ const (
tasksHistory = "tasksHistory"
taskListQueue = "taskLists"
taskListTemplates = "taskListTemplates"
taskListHistory = "taskListHistory"
)
type Storage struct {
......@@ -29,6 +30,7 @@ type Storage struct {
tasksHistory *mongo.Collection
taskLists *mongo.Collection
taskListTemplates *mongo.Collection
taskListHistory *mongo.Collection
}
func New(db *mongo.Client) *Storage {
......@@ -38,6 +40,7 @@ func New(db *mongo.Client) *Storage {
tasksHistory: db.Database(taskDB).Collection(tasksHistory),
taskListTemplates: db.Database(taskDB).Collection(taskListTemplates),
taskLists: db.Database(taskDB).Collection(taskListQueue),
taskListHistory: db.Database(taskDB).Collection(taskListHistory),
}
}
......@@ -238,3 +241,77 @@ func (s *Storage) AckList(ctx context.Context, taskList *tasklist.TaskList) erro
_, err := s.taskLists.DeleteOne(ctx, bson.M{"id": taskList.ID})
return err
}
// PollList retrieves one taskList from the taskLists collection
// with the older ones being retrieved first (FIFO). It updates the state
// of the task to "pending", so that consequent calls to PollList would
// not retrieve the same task.
func (s *Storage) PollList(ctx context.Context) (*tasklist.TaskList, error) {
opts := options.
FindOneAndUpdate().
SetSort(bson.M{"createdAt": 1}).
SetReturnDocument(options.After)
filter := bson.M{"state": task.Created}
update := bson.M{"$set": bson.M{"state": task.Pending}}
result := s.taskLists.FindOneAndUpdate(
ctx,
filter,
update,
opts,
)
if result.Err() != nil {
if strings.Contains(result.Err().Error(), "no documents in result") {
return nil, errors.New(errors.NotFound, "taskList not found")
}
return nil, result.Err()
}
var list tasklist.TaskList
if err := result.Decode(&list); err != nil {
return nil, err
}
return &list, nil
}
// GetGroupTasks fetches all tasks by a groupID
func (s *Storage) GetGroupTasks(ctx context.Context, group *tasklist.Group) ([]*task.Task, error) {
filter := bson.M{"groupid": group.ID}
opts := options.Find().SetSort(bson.M{"createdat": 1})
cursor, err := s.tasks.Find(ctx, filter, opts)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var tasks []*task.Task
for cursor.Next(ctx) {
var task task.Task
if err := cursor.Decode(&task); err != nil {
return nil, err
}
tasks = append(tasks, &task)
}
return tasks, nil
}
// AckGroupTasks removes tasks from tasks collection by groupID
func (s *Storage) AckGroupTasks(ctx context.Context, group *tasklist.Group) error {
_, err := s.tasks.DeleteMany(ctx, bson.M{"groupid": group.ID})
return err
}
// SaveTaskListHistory adds a tasklist to the taskListHistory collection
func (s *Storage) SaveTaskListHistory(ctx context.Context, taskList *tasklist.TaskList) error {
insert := func() error {
_, err := s.taskListHistory.InsertOne(ctx, taskList)
return err
}
b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
return backoff.Retry(insert, b)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment