diff --git a/internal/executor/worker.go b/internal/executor/worker.go index dd36b3dabe5404128292e358a83b6e937b32cf75..feb9d3f81eddbd82698bd4d14ca373a6572cc1e5 100644 --- a/internal/executor/worker.go +++ b/internal/executor/worker.go @@ -64,6 +64,7 @@ func (w *Worker) Start(ctx context.Context) { } continue } + logger.Debug("task execution completed successfully") if err := w.cache.Set( ctx, @@ -78,11 +79,13 @@ func (w *Worker) Start(ctx context.Context) { } continue } + logger.Debug("task results are stored in cache") if err := w.storage.SaveTaskHistory(ctx, executed); err != nil { logger.Error("error saving task history", zap.Error(err)) continue } + logger.Debug("task history is saved") // remove task from queue if err := w.queue.Ack(ctx, executed); err != nil { @@ -95,23 +98,26 @@ func (w *Worker) Start(ctx context.Context) { func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task, error) { task.StartedAt = time.Now() - // evaluate request policy + var response []byte + var err error + + // task is executing a request policy OR an HTTP call to predefined URL if task.RequestPolicy != "" { - resp, err := w.policy.Evaluate(ctx, task.RequestPolicy, task.Request) + response, err = w.policy.Evaluate(ctx, task.RequestPolicy, task.Request) if err != nil { return nil, errors.New("error evaluating request policy", err) } - // overwrite task request with the one returned from the policy - task.Request = resp - } - - status, response, err := w.do(ctx, task) - if err != nil { - return nil, err + } else if task.URL != "" && task.Method != "" { + var status int + status, response, err = w.doHTTPTask(ctx, task) + if err != nil { + return nil, err + } + task.ResponseCode = status + } else { + return nil, errors.New(errors.Internal, "invalid task: must define either request policy or url") } - task.State = taskpkg.Done - task.ResponseCode = status task.Response = response // evaluate response policy @@ -134,11 +140,12 @@ func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task task.Response = resp } + task.State = taskpkg.Done task.FinishedAt = time.Now() return task, nil } -func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) { +func (w *Worker) doHTTPTask(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) { req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request)) if err != nil { return 0, nil, errors.New("error creating http request", err) diff --git a/internal/service/task/task.go b/internal/service/task/task.go index 962a887a2dd4b954d7207216fc9612e0c494d486..dd53a5175c505258f3822fa0ee6e8cdd57533dd7 100644 --- a/internal/service/task/task.go +++ b/internal/service/task/task.go @@ -25,8 +25,8 @@ type Task struct { GroupID string `json:"groupID"` // GroupID is set when the task is part of `task.Group`. Name string `json:"name"` // Name is used by external callers use to create tasks. State State `json:"state"` // State of the task. - URL string `json:"url"` // URL against which the task request will be executed. - Method string `json:"method"` // HTTP method of the task request. + URL string `json:"url"` // URL against which the task request will be executed (optional). + Method string `json:"method"` // HTTP method of the task request (optional). Request []byte `json:"request"` // Request body which will be sent in the task request. Response []byte `json:"response"` // Response received after the task request is executed. ResponseCode int `json:"responseCode"` // ResponseCode received after task request is executed.