Skip to content
Snippets Groups Projects
Commit 8cc33a3a authored by Lyuben Penkovski's avatar Lyuben Penkovski
Browse files

Make tasks HTTP URL and Method optional

We changed the design, so now tasks are either executing
a request policy, or an HTTP request to a predefined URL,
but not both.

In the previous commits "request policy" could only be used as
a request transformation and the actual task was executing an
HTTP request to a predefined URL. With the current change tasks
can execute "request policy" if it's set, or make an HTTP call.
parent 0d894d1d
Branches
Tags
1 merge request!4Task execution of independent tasks
Pipeline #50639 passed
...@@ -64,6 +64,7 @@ func (w *Worker) Start(ctx context.Context) { ...@@ -64,6 +64,7 @@ func (w *Worker) Start(ctx context.Context) {
} }
continue continue
} }
logger.Debug("task execution completed successfully")
if err := w.cache.Set( if err := w.cache.Set(
ctx, ctx,
...@@ -78,11 +79,13 @@ func (w *Worker) Start(ctx context.Context) { ...@@ -78,11 +79,13 @@ func (w *Worker) Start(ctx context.Context) {
} }
continue continue
} }
logger.Debug("task results are stored in cache")
if err := w.storage.SaveTaskHistory(ctx, executed); err != nil { if err := w.storage.SaveTaskHistory(ctx, executed); err != nil {
logger.Error("error saving task history", zap.Error(err)) logger.Error("error saving task history", zap.Error(err))
continue continue
} }
logger.Debug("task history is saved")
// remove task from queue // remove task from queue
if err := w.queue.Ack(ctx, executed); err != nil { if err := w.queue.Ack(ctx, executed); err != nil {
...@@ -95,23 +98,26 @@ func (w *Worker) Start(ctx context.Context) { ...@@ -95,23 +98,26 @@ func (w *Worker) Start(ctx context.Context) {
func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task, error) { func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task, error) {
task.StartedAt = time.Now() task.StartedAt = time.Now()
// evaluate request policy var response []byte
var err error
// task is executing a request policy OR an HTTP call to predefined URL
if task.RequestPolicy != "" { if task.RequestPolicy != "" {
resp, err := w.policy.Evaluate(ctx, task.RequestPolicy, task.Request) response, err = w.policy.Evaluate(ctx, task.RequestPolicy, task.Request)
if err != nil { if err != nil {
return nil, errors.New("error evaluating request policy", err) return nil, errors.New("error evaluating request policy", err)
} }
// overwrite task request with the one returned from the policy } else if task.URL != "" && task.Method != "" {
task.Request = resp var status int
} status, response, err = w.doHTTPTask(ctx, task)
if err != nil {
status, response, err := w.do(ctx, task) return nil, err
if err != nil { }
return nil, err task.ResponseCode = status
} else {
return nil, errors.New(errors.Internal, "invalid task: must define either request policy or url")
} }
task.State = taskpkg.Done
task.ResponseCode = status
task.Response = response task.Response = response
// evaluate response policy // evaluate response policy
...@@ -134,11 +140,12 @@ func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task ...@@ -134,11 +140,12 @@ func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task
task.Response = resp task.Response = resp
} }
task.State = taskpkg.Done
task.FinishedAt = time.Now() task.FinishedAt = time.Now()
return task, nil return task, nil
} }
func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) { func (w *Worker) doHTTPTask(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) {
req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request)) req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request))
if err != nil { if err != nil {
return 0, nil, errors.New("error creating http request", err) return 0, nil, errors.New("error creating http request", err)
......
...@@ -25,8 +25,8 @@ type Task struct { ...@@ -25,8 +25,8 @@ type Task struct {
GroupID string `json:"groupID"` // GroupID is set when the task is part of `task.Group`. GroupID string `json:"groupID"` // GroupID is set when the task is part of `task.Group`.
Name string `json:"name"` // Name is used by external callers use to create tasks. Name string `json:"name"` // Name is used by external callers use to create tasks.
State State `json:"state"` // State of the task. State State `json:"state"` // State of the task.
URL string `json:"url"` // URL against which the task request will be executed. URL string `json:"url"` // URL against which the task request will be executed (optional).
Method string `json:"method"` // HTTP method of the task request. Method string `json:"method"` // HTTP method of the task request (optional).
Request []byte `json:"request"` // Request body which will be sent in the task request. Request []byte `json:"request"` // Request body which will be sent in the task request.
Response []byte `json:"response"` // Response received after the task request is executed. Response []byte `json:"response"` // Response received after the task request is executed.
ResponseCode int `json:"responseCode"` // ResponseCode received after task request is executed. ResponseCode int `json:"responseCode"` // ResponseCode received after task request is executed.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment