diff --git a/README.md b/README.md index 9e3091872a7adb3dc1855c44a36c0a144dc5280b..c5e80af9f480c29ae049f9768b25ded7cd3335d4 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ # Task Service -The task service provides an HTTP interface for executing asynchronous (HTTP) tasks. +The task service provides an HTTP interface for executing asynchronous (HTTP) tasks and task lists. It is developed using the [Goa v3](https://goa.design/) framework. @@ -32,113 +32,14 @@ immediate response with the `taskID` for the created task and can later query the state of task and retrieve its result either by directly querying the Cache service, or by querying the task HTTP interface for task results. -### Task Definition - -A tasks is described in JSON format and base task (template) definition *must* be available -before a task can be created. Task JSON templates are stored in Mongo database in a collection -named `taskTemplates`. Some task properties are statically predefined in the template, -others are populated dynamically by the input request and the output response. Below is -an example of task template definition: - -```json -{ - "name":"exampleTask", - "url":"https://jsonplaceholder.typicode.com/todos/1", - "method":"GET", - "requestPolicy":"example/example/1.0", - "responsePolicy":"", - "finalPolicy":"", - "cacheNamespace":"login", - "cacheScope":"user" -} -``` - -Tasks are created by using their `name` attribute. If a task template with the given -`name` does not exist, a task will not be created and an error is returned. - -### Task Execution - -Below is an example of creating a task with the template definition given above: -```shell -curl -v -X POST http://localhost:8082/v1/task/exampleTask -d '{"exampleInput":{"test":123}}' -``` - -The HTTP request will create a task for asynchronous execution and the JSON object -given as input will be used as the body of the task request. The caller will receive -immediately the `taskID` as response, and the result of the asynchronous task -execution will be stored in the TSA Cache after the task is completed. - -### Task Executor Configuration - -There are two environment variables that control the level of concurrency -of the task executor. - -```shell -EXECUTOR_WORKERS="5" -EXECUTOR_POLL_INTERVAL="1s" -``` - -Poll interval specifies how often the executor will try to fetch a *pending* task -from the queue. After a task is fetched, it is given to one of the workers for execution. -With the given example of 1 second (1s), the executor will retrieve 1 task per second at most. -If there are multiple instances (pods) of the service, multiply by their number -(e.g. 5 pods = 5 tasks per second). - -If this is not enough, the poll interval can be decreased, or we can slightly modify -the polling function to fetch many tasks at once (and also increase the number of workers). - -That leads us to the next section and a question. - -### Why the Queue is Database - -Why we decided to use a database as queue instead of a universal message queue product -like Kafka, so that the executor won't need to poll for new tasks, but will instead -receive them as they come? - -1. The TSA requirements document describes a functionality for task groups. -These are groups of tasks which may be executed sequentially with later tasks in the -group depending on the results of the previous tasks in the group. This means that we -can't just execute a task when an event for its creation has arrived. We have to keep -persistent execution state through multiple task executions and multiple service instances -(e.g. pods/executors) for which a database seems like more natural choice. - -2. Tasks are not just simple messages, but contain state. The state may change during -the lifecycle and execution progress of the task. The state must also be persistent, -auditable and should be available for querying by clients. A database seems more suitable -to us for implementing these features than a simple delivery message queue. - -The downside of our current approach is that the database is constantly polled by the -executor for new tasks. In practice this should not be a problem, because the task collection -containing *pending* tasks for execution should contain very small number of records. -Even at peak load, it should contain a few hundred tasks at most, because after tasks -are executed they are removed from the queue collection. We expect that the queue collection -will be empty most of the time and even when it isn't, there won't be many records inside. -Practically speaking, the query to retrieve tasks for execution should be very fast and -light in terms of database load. - -The benefits of this approach are that it's simple to implement and reason about. - -> If you have better ideas, or these arguments sound strange, please get in touch with us -> and we'll consider other options and improvements to the current model. - -### Task Storage - -We use MongoDB for tasks storage. There are three Mongo collections with different purpose. - -1. **taskTemplates** - -The collection contains predefined task definitions in JSON format. Here are defined -what tasks can be created and executed by the service. +### More information +* [Tasks](docs/task.md) +* [Task lists](docs/task-list.md) +* [Queue](docs/queue.md) -2. **tasks** -The collection contains newly created tasks *pending* for execution. It acts like a -FIFO queue and is used by the task executor to retrieve tasks for workers to execute. -3. **tasksHistory** -The collection contains successfully completed tasks for results querying, -audit, reporting and debugging purposes. ### Tests and Linters diff --git a/docs/new.json b/docs/new.json new file mode 100644 index 0000000000000000000000000000000000000000..69e021678d052f0f868a24a39ab818911a6b47c5 --- /dev/null +++ b/docs/new.json @@ -0,0 +1,22 @@ +{ + "name": "example", + "cacheNamespace": "login", + "cacheScope": "user", + "groups": [ + { + "execution": "sequential", + "tasks": [ + "task1", + "task2" + ] + }, + { + "execution": "parallel", + "tasks": [ + "task3", + "task4", + "task5" + ] + } + ] +} \ No newline at end of file diff --git a/docs/queue.md b/docs/queue.md new file mode 100644 index 0000000000000000000000000000000000000000..dc976358d41f23a9ce6cc0115040394c56bd8eb9 --- /dev/null +++ b/docs/queue.md @@ -0,0 +1,33 @@ +#Task service - Queue + +### Why the Queue is Database + +Why we decided to use a database as queue instead of a universal message queue product +like Kafka, so that the executor won't need to poll for new tasks, but will instead +receive them as they come? + +1. The TSA requirements document describes a functionality for task groups. + These are groups of tasks which may be executed sequentially with later tasks in the + group depending on the results of the previous tasks in the group. This means that we + can't just execute a task when an event for its creation has arrived. We have to keep + persistent execution state through multiple task executions and multiple service instances + (e.g. pods/executors) for which a database seems like more natural choice. + +2. Tasks are not just simple messages, but contain state. The state may change during + the lifecycle and execution progress of the task. The state must also be persistent, + auditable and should be available for querying by clients. A database seems more suitable + to us for implementing these features than a simple delivery message queue. + +The downside of our current approach is that the database is constantly polled by the +executor for new tasks. In practice this should not be a problem, because the task collection +containing *pending* tasks for execution should contain very small number of records. +Even at peak load, it should contain a few hundred tasks at most, because after tasks +are executed they are removed from the queue collection. We expect that the queue collection +will be empty most of the time and even when it isn't, there won't be many records inside. +Practically speaking, the query to retrieve tasks for execution should be very fast and +light in terms of database load. + +The benefits of this approach are that it's simple to implement and reason about. + +> If you have better ideas, or these arguments sound strange, please get in touch with us +> and we'll consider other options and improvements to the current model. diff --git a/docs/task-list.md b/docs/task-list.md new file mode 100644 index 0000000000000000000000000000000000000000..5b2b4fcf9e170d9923132004f31ebbb840d56376 --- /dev/null +++ b/docs/task-list.md @@ -0,0 +1,107 @@ +# Task service - Task list + +### Task list definition + +A task list is described in JSON format and base task list (template) definition *must* be available +before a task list can be created. Task list JSON templates are stored in Mongo database in a collection +named `taskListTemplates`. Below is an example of task list template definition: + +```json +{ + "name": "example", + "cacheNamespace": "login", + "cacheScope": "user", + "groups": [ + { + "execution": "sequential", + "tasks": [ + "taskName1", + "taskName2" + ] + }, + { + "execution": "parallel", + "tasks": [ + "taskName3", + "taskName4", + "taskName5" + ] + } + ] +} +``` + +Task lists are created by using their `name` attribute. Tasks for each group in the `groups` field +are also created by using their `name` so task names should match existing task template +definitions (see [tasks](task.md)). If a task list template with the given `name` does not exist OR a task +template definition does not exist, a task list will not be created and an error is returned. + +### Task list Execution + +Below is an example of creating a task list with the template definition given above: +```shell +curl -v -X POST http://localhost:8082/v1/taskList/example -d '{"input": {"key": "value"}}' +``` + +The HTTP request will create a task list and corresponding tasks for each of the groups within the task list +for asynchronous execution and the JSON object given as input will be used as the body of the task list request. +The caller will receive immediately the `taskListID` as response. + +The executor then takes a task list from the queue and executes the groups within the task list sequentially. +Each group has a field called `execution` which can be one of two options: `parallel` or `sequential`. This +field describes how the tasks within the group *must* be executed. + - Sequential group execution: tasks within the group are executed sequentially and the result of each task is passed as +an input to the next. If one task fails to execute, all following tasks are marked as failed and the whole group fails. + - Parallel group execution: tasks within the group are executed in parallel and the results are dependant. If a task +fails to execute, this does not affect the other tasks but the group is marked with failed status. + +The state of the task list asynchronous execution is available later on the `result` endpoint: +```shell +curl -v -X GET http://localhost:8082/v1/taskListResult/{taskListID} +``` + +### Task list Executor Configuration + +There are two environment variables that control the level of concurrency +of the task list executor. + +```shell +LIST_EXECUTOR_WORKERS="5" +LIST_EXECUTOR_POLL_INTERVAL="1s" +``` + +Poll interval specifies how often the executor will try to fetch a *pending* task list +from the queue. After a task list is fetched, it is executed in parallel with other executions. +Number of workers is the limit of permitted parallel task lists executions. +With the given example of 1 second (1s), the executor will retrieve 1 task list per second at most. +If there are multiple instances (pods) of the service, multiply by their number +(e.g. 5 pods = 5 tasks per second). + +If this is not enough, the poll interval can be decreased, or we can slightly modify +the polling function to fetch many task lists at once (and also increase the number of workers). + +To learn more about the queue and why we use database as queue see [queue](queue.md) + +### Task List Storage + +We use MongoDB for task list storage. There are three Mongo collections with different purpose. + +1. **taskListTemplates** + +The collection contains predefined task list definitions in JSON format. Each definition contains +groups of tasks which must be instantiated and later executed as part of the task list. + +2. **taskLists** + +The collection contains newly created task lists *pending* for execution. It acts like a +FIFO queue and is used by the task list executor to retrieve task lists for workers to execute. + +3. **tasks** + +The collection contains the tasks belonging to a group which is part of a task list. When a task list +is fetched for execution, all tasks are fetched and executed for that particular task list. + +4. **tasksListHistory** + +The collection contains completed task lists for results querying, +audit, reporting and debugging purposes. diff --git a/docs/task.md b/docs/task.md new file mode 100644 index 0000000000000000000000000000000000000000..c023af206b6bad0d539b1d9b2d1b47311b7846ef --- /dev/null +++ b/docs/task.md @@ -0,0 +1,77 @@ +# Task Service - Task Documentation + +### Task Definition + +A tasks is described in JSON format and base task (template) definition *must* be available +before a task can be created. Task JSON templates are stored in Mongo database in a collection +named `taskTemplates`. Some task properties are statically predefined in the template, +others are populated dynamically by the input request and the output response. Below is +an example of task template definition: + +```json +{ + "name":"exampleTask", + "url":"https://jsonplaceholder.typicode.com/todos/1", + "method":"GET", + "requestPolicy":"example/example/1.0", + "responsePolicy":"", + "finalPolicy":"", + "cacheNamespace":"login", + "cacheScope":"user" +} +``` + +Tasks are created by using their `name` attribute. If a task template with the given +`name` does not exist, a task will not be created and an error is returned. + +### Task Execution + +Below is an example of creating a task with the template definition given above: +```shell +curl -v -X POST http://localhost:8082/v1/task/exampleTask -d '{"exampleInput":{"test":123}}' +``` + +The HTTP request will create a task for asynchronous execution and the JSON object +given as input will be used as the body of the task request. The caller will receive +immediately the `taskID` as response, and the result of the asynchronous task +execution will be stored in the TSA Cache after the task is completed. + +### Task Executor Configuration + +There are two environment variables that control the level of concurrency +of the task executor. + +```shell +EXECUTOR_WORKERS="5" +EXECUTOR_POLL_INTERVAL="1s" +``` + +Poll interval specifies how often the executor will try to fetch a *pending* task +from the queue. After a task is fetched, it is given to one of the workers for execution. +With the given example of 1 second (1s), the executor will retrieve 1 task per second at most. +If there are multiple instances (pods) of the service, multiply by their number +(e.g. 5 pods = 5 tasks per second). + +If this is not enough, the poll interval can be decreased, or we can slightly modify +the polling function to fetch many tasks at once (and also increase the number of workers). + +To learn more about the queue and why we use database as queue see [queue](queue.md) + +### Task Storage + +We use MongoDB for tasks storage. There are three Mongo collections with different purpose. + +1. **taskTemplates** + +The collection contains predefined task definitions in JSON format. Here are defined +what tasks can be created and executed by the service. + +2. **tasks** + +The collection contains newly created tasks *pending* for execution. It acts like a +FIFO queue and is used by the task executor to retrieve tasks for workers to execute. + +3. **tasksHistory** + +The collection contains successfully completed tasks for results querying, +audit, reporting and debugging purposes. diff --git a/internal/config/config.go b/internal/config/config.go index 582623bfd107cbcaf889edf5a68b900f1e4e5e6f..42b6cce22080f379c5bdc7095eee55bb894df94c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -38,7 +38,7 @@ type executorConfig struct { type listExecutorConfig struct { Workers int `envconfig:"LIST_EXECUTOR_WORKERS" default:"5"` - PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` + PollInterval time.Duration `envconfig:"LIST_EXECUTOR_POLL_INTERVAL" default:"1s"` } type cacheConfig struct { diff --git a/internal/listexecutor/listexecutor.go b/internal/listexecutor/listexecutor.go index cff312d11a13201a2ebb2cad50ef080a1180e164..a2a4db65e39ebf8266593b993296d47577218a3d 100644 --- a/internal/listexecutor/listexecutor.go +++ b/internal/listexecutor/listexecutor.go @@ -190,11 +190,8 @@ func (l *ListExecutor) executeSequential(ctx context.Context, group *tasklist.Gr } logger.Debug("task execution completed successfully") - // pass the response from current task to the next task + // pass the response from current task as an input to the next task req = task.Response - if bytes.Compare(task.Response, []byte(nil)) == 0 { - req = []byte{} - } if err := l.cache.Set( ctx, diff --git a/internal/service/tasklist/service.go b/internal/service/tasklist/service.go index 2aec2fcfe1c20a6fd2ebfafe3d48d26b1ecfb89a..bc996f5bfdbdd9978fd581edd1c13ddb5f9aca7a 100644 --- a/internal/service/tasklist/service.go +++ b/internal/service/tasklist/service.go @@ -113,7 +113,6 @@ func createGroups(t *Template, req []byte) []Group { Execution: group.Execution, Tasks: group.Tasks, State: task.Created, - Metadata: group.Metadata, Request: req, FinalPolicy: group.FinalPolicy, } diff --git a/internal/service/tasklist/task_list.go b/internal/service/tasklist/task_list.go index c01cd1be6a50ed8552e6026fda82f222bfee181b..065e72071705c22644d84d0143862846e4c045a2 100644 --- a/internal/service/tasklist/task_list.go +++ b/internal/service/tasklist/task_list.go @@ -14,10 +14,9 @@ type Template struct { } type GroupTemplate struct { - Execution string `json:"execution"` - FinalPolicy string `json:"finalPolicy"` - Tasks []string `json:"tasks"` - Metadata interface{} `json:"metadata"` + Execution string `json:"execution"` + FinalPolicy string `json:"finalPolicy"` + Tasks []string `json:"tasks"` } type TaskList struct { @@ -34,11 +33,10 @@ type TaskList struct { } type Group struct { - ID string `json:"id"` - Execution string `json:"execution"` - Tasks []string `json:"tasks"` - State task.State `json:"state"` - Request []byte `json:"request"` - Metadata interface{} `json:"metadata"` // TODO(penkovski): not yet clear - FinalPolicy string `json:"finalPolicy"` + ID string `json:"id"` + Execution string `json:"execution"` + Tasks []string `json:"tasks"` + State task.State `json:"state"` + Request []byte `json:"request"` + FinalPolicy string `json:"finalPolicy"` }