Skip to content
Snippets Groups Projects
Commit 07bdc57c authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

Merge branch '14-parallel-task-list-executor' into 'main'

Parallel task list executor

Closes #14

See merge request !9
parents 5257b298 8d56fba0
No related branches found
Tags v1.1.0
1 merge request!9Parallel task list executor
Pipeline #51513 passed with stage
in 43 seconds
......@@ -5,6 +5,7 @@ import (
"context"
"io"
"net/http"
"sync"
"time"
"go.uber.org/zap"
......@@ -154,7 +155,7 @@ func (l *ListExecutor) executeGroup(ctx context.Context, group *tasklist.Group)
case sequential:
return l.executeSequential(ctx, group)
case parallel:
return errors.New("not implemented")
return l.executeParallel(ctx, group)
}
return errors.New("unknown type of group execution")
......@@ -225,6 +226,69 @@ func (l *ListExecutor) executeSequential(ctx context.Context, group *tasklist.Gr
return nil
}
func (l *ListExecutor) executeParallel(ctx context.Context, group *tasklist.Group) error {
group.State = taskpkg.Pending
tasks, err := l.storage.GetGroupTasks(ctx, group)
if err != nil {
return err
}
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
go func(t *taskpkg.Task) {
defer wg.Done()
logger := l.logger.With(
zap.String("taskID", t.ID),
zap.String("taskName", t.Name),
)
// pass group request to each task
t.Request = group.Request
if err := l.executeTask(ctx, t); err != nil {
t.State = taskpkg.Failed
group.State = taskpkg.Failed
logger.Error("error executing task", zap.Error(err))
return
}
logger.Debug("task execution completed successfully")
if err := l.cache.Set(
ctx,
t.ID,
t.CacheNamespace,
t.CacheScope,
t.Response,
); err != nil {
logger.Error("error storing task result in cache", zap.Error(err))
return
}
logger.Debug("task results are stored in cache")
if err := l.storage.SaveTaskHistory(ctx, t); err != nil {
logger.Error("error saving task history", zap.Error(err))
return
}
logger.Debug("task history is saved")
}(task)
}
// wait for all tasks to be executed
wg.Wait()
// remove tasks from queue
if err := l.queue.AckGroupTasks(ctx, group); err != nil {
l.logger.With(zap.String("groupID", group.ID)).Error("failed to ack group tasks in queue", zap.Error(err))
}
if group.State != taskpkg.Failed {
group.State = taskpkg.Done
}
return nil
}
func (l *ListExecutor) executeTask(ctx context.Context, task *taskpkg.Task) error {
task.StartedAt = time.Now()
......@@ -277,7 +341,11 @@ func (l *ListExecutor) executeTask(ctx context.Context, task *taskpkg.Task) erro
}
func (l *ListExecutor) doHTTPTask(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) {
req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request))
reqBody := task.Request
if task.Method == http.MethodGet {
reqBody = nil
}
req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(reqBody))
if err != nil {
return 0, nil, errors.New("error creating http request", err)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment