diff --git a/internal/listexecutor/listexecutor.go b/internal/listexecutor/listexecutor.go index 530e0eee6c67926ef6f7dab60241daad6c861b04..b05bad414c12af2880cfce1296ea5c4473bbe1eb 100644 --- a/internal/listexecutor/listexecutor.go +++ b/internal/listexecutor/listexecutor.go @@ -5,6 +5,7 @@ import ( "context" "io" "net/http" + "sync" "time" "go.uber.org/zap" @@ -154,7 +155,7 @@ func (l *ListExecutor) executeGroup(ctx context.Context, group *tasklist.Group) case sequential: return l.executeSequential(ctx, group) case parallel: - return errors.New("not implemented") + return l.executeParallel(ctx, group) } return errors.New("unknown type of group execution") @@ -225,6 +226,69 @@ func (l *ListExecutor) executeSequential(ctx context.Context, group *tasklist.Gr return nil } +func (l *ListExecutor) executeParallel(ctx context.Context, group *tasklist.Group) error { + group.State = taskpkg.Pending + + tasks, err := l.storage.GetGroupTasks(ctx, group) + if err != nil { + return err + } + + var wg sync.WaitGroup + for _, task := range tasks { + wg.Add(1) + go func(t *taskpkg.Task) { + defer wg.Done() + logger := l.logger.With( + zap.String("taskID", t.ID), + zap.String("taskName", t.Name), + ) + // pass group request to each task + t.Request = group.Request + + if err := l.executeTask(ctx, t); err != nil { + t.State = taskpkg.Failed + group.State = taskpkg.Failed + logger.Error("error executing task", zap.Error(err)) + return + } + logger.Debug("task execution completed successfully") + + if err := l.cache.Set( + ctx, + t.ID, + t.CacheNamespace, + t.CacheScope, + t.Response, + ); err != nil { + logger.Error("error storing task result in cache", zap.Error(err)) + return + } + logger.Debug("task results are stored in cache") + + if err := l.storage.SaveTaskHistory(ctx, t); err != nil { + logger.Error("error saving task history", zap.Error(err)) + return + } + logger.Debug("task history is saved") + }(task) + } + + // wait for all tasks to be executed + wg.Wait() + + // remove tasks from queue + if err := l.queue.AckGroupTasks(ctx, group); err != nil { + l.logger.With(zap.String("groupID", group.ID)).Error("failed to ack group tasks in queue", zap.Error(err)) + } + + if group.State != taskpkg.Failed { + group.State = taskpkg.Done + } + + return nil +} + func (l *ListExecutor) executeTask(ctx context.Context, task *taskpkg.Task) error { task.StartedAt = time.Now() @@ -277,7 +341,11 @@ func (l *ListExecutor) executeTask(ctx context.Context, task *taskpkg.Task) erro } func (l *ListExecutor) doHTTPTask(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) { - req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request)) + reqBody := task.Request + if task.Method == http.MethodGet { + reqBody = nil + } + req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(reqBody)) if err != nil { return 0, nil, errors.New("error creating http request", err) }