Skip to content
Snippets Groups Projects
Commit 5bddb39d authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

#13 documentation

parent 066e96f4
No related branches found
No related tags found
1 merge request!8Sequential Task List executor
Pipeline #51136 failed with stage
in 50 seconds
......@@ -3,7 +3,7 @@
# Task Service
The task service provides an HTTP interface for executing asynchronous (HTTP) tasks.
The task service provides an HTTP interface for executing asynchronous (HTTP) tasks and task lists.
It is developed using the [Goa v3](https://goa.design/) framework.
......@@ -32,113 +32,14 @@ immediate response with the `taskID` for the created task and can later query
the state of task and retrieve its result either by directly querying the Cache
service, or by querying the task HTTP interface for task results.
### Task Definition
A tasks is described in JSON format and base task (template) definition *must* be available
before a task can be created. Task JSON templates are stored in Mongo database in a collection
named `taskTemplates`. Some task properties are statically predefined in the template,
others are populated dynamically by the input request and the output response. Below is
an example of task template definition:
```json
{
"name":"exampleTask",
"url":"https://jsonplaceholder.typicode.com/todos/1",
"method":"GET",
"requestPolicy":"example/example/1.0",
"responsePolicy":"",
"finalPolicy":"",
"cacheNamespace":"login",
"cacheScope":"user"
}
```
Tasks are created by using their `name` attribute. If a task template with the given
`name` does not exist, a task will not be created and an error is returned.
### Task Execution
Below is an example of creating a task with the template definition given above:
```shell
curl -v -X POST http://localhost:8082/v1/task/exampleTask -d '{"exampleInput":{"test":123}}'
```
The HTTP request will create a task for asynchronous execution and the JSON object
given as input will be used as the body of the task request. The caller will receive
immediately the `taskID` as response, and the result of the asynchronous task
execution will be stored in the TSA Cache after the task is completed.
### Task Executor Configuration
There are two environment variables that control the level of concurrency
of the task executor.
```shell
EXECUTOR_WORKERS="5"
EXECUTOR_POLL_INTERVAL="1s"
```
Poll interval specifies how often the executor will try to fetch a *pending* task
from the queue. After a task is fetched, it is given to one of the workers for execution.
With the given example of 1 second (1s), the executor will retrieve 1 task per second at most.
If there are multiple instances (pods) of the service, multiply by their number
(e.g. 5 pods = 5 tasks per second).
If this is not enough, the poll interval can be decreased, or we can slightly modify
the polling function to fetch many tasks at once (and also increase the number of workers).
That leads us to the next section and a question.
### Why the Queue is Database
Why we decided to use a database as queue instead of a universal message queue product
like Kafka, so that the executor won't need to poll for new tasks, but will instead
receive them as they come?
1. The TSA requirements document describes a functionality for task groups.
These are groups of tasks which may be executed sequentially with later tasks in the
group depending on the results of the previous tasks in the group. This means that we
can't just execute a task when an event for its creation has arrived. We have to keep
persistent execution state through multiple task executions and multiple service instances
(e.g. pods/executors) for which a database seems like more natural choice.
2. Tasks are not just simple messages, but contain state. The state may change during
the lifecycle and execution progress of the task. The state must also be persistent,
auditable and should be available for querying by clients. A database seems more suitable
to us for implementing these features than a simple delivery message queue.
The downside of our current approach is that the database is constantly polled by the
executor for new tasks. In practice this should not be a problem, because the task collection
containing *pending* tasks for execution should contain very small number of records.
Even at peak load, it should contain a few hundred tasks at most, because after tasks
are executed they are removed from the queue collection. We expect that the queue collection
will be empty most of the time and even when it isn't, there won't be many records inside.
Practically speaking, the query to retrieve tasks for execution should be very fast and
light in terms of database load.
The benefits of this approach are that it's simple to implement and reason about.
> If you have better ideas, or these arguments sound strange, please get in touch with us
> and we'll consider other options and improvements to the current model.
### Task Storage
We use MongoDB for tasks storage. There are three Mongo collections with different purpose.
1. **taskTemplates**
The collection contains predefined task definitions in JSON format. Here are defined
what tasks can be created and executed by the service.
### More information
* [Tasks](docs/task.md)
* [Task lists](docs/task-list.md)
* [Queue](docs/queue.md)
2. **tasks**
The collection contains newly created tasks *pending* for execution. It acts like a
FIFO queue and is used by the task executor to retrieve tasks for workers to execute.
3. **tasksHistory**
The collection contains successfully completed tasks for results querying,
audit, reporting and debugging purposes.
### Tests and Linters
......
{
"name": "example",
"cacheNamespace": "login",
"cacheScope": "user",
"groups": [
{
"execution": "sequential",
"tasks": [
"task1",
"task2"
]
},
{
"execution": "parallel",
"tasks": [
"task3",
"task4",
"task5"
]
}
]
}
\ No newline at end of file
#Task service - Queue
### Why the Queue is Database
Why we decided to use a database as queue instead of a universal message queue product
like Kafka, so that the executor won't need to poll for new tasks, but will instead
receive them as they come?
1. The TSA requirements document describes a functionality for task groups.
These are groups of tasks which may be executed sequentially with later tasks in the
group depending on the results of the previous tasks in the group. This means that we
can't just execute a task when an event for its creation has arrived. We have to keep
persistent execution state through multiple task executions and multiple service instances
(e.g. pods/executors) for which a database seems like more natural choice.
2. Tasks are not just simple messages, but contain state. The state may change during
the lifecycle and execution progress of the task. The state must also be persistent,
auditable and should be available for querying by clients. A database seems more suitable
to us for implementing these features than a simple delivery message queue.
The downside of our current approach is that the database is constantly polled by the
executor for new tasks. In practice this should not be a problem, because the task collection
containing *pending* tasks for execution should contain very small number of records.
Even at peak load, it should contain a few hundred tasks at most, because after tasks
are executed they are removed from the queue collection. We expect that the queue collection
will be empty most of the time and even when it isn't, there won't be many records inside.
Practically speaking, the query to retrieve tasks for execution should be very fast and
light in terms of database load.
The benefits of this approach are that it's simple to implement and reason about.
> If you have better ideas, or these arguments sound strange, please get in touch with us
> and we'll consider other options and improvements to the current model.
# Task service - Task list
### Task list definition
A task list is described in JSON format and base task list (template) definition *must* be available
before a task list can be created. Task list JSON templates are stored in Mongo database in a collection
named `taskListTemplates`. Below is an example of task list template definition:
```json
{
"name": "example",
"cacheNamespace": "login",
"cacheScope": "user",
"groups": [
{
"execution": "sequential",
"tasks": [
"taskName1",
"taskName2"
]
},
{
"execution": "parallel",
"tasks": [
"taskName3",
"taskName4",
"taskName5"
]
}
]
}
```
Task lists are created by using their `name` attribute. Tasks for each group in the `groups` field
are also created by using their `name` so task names should match existing task template
definitions (see [tasks](task.md)). If a task list template with the given `name` does not exist OR a task
template definition does not exist, a task list will not be created and an error is returned.
### Task list Execution
Below is an example of creating a task list with the template definition given above:
```shell
curl -v -X POST http://localhost:8082/v1/taskList/example -d '{"input": {"key": "value"}}'
```
The HTTP request will create a task list and corresponding tasks for each of the groups within the task list
for asynchronous execution and the JSON object given as input will be used as the body of the task list request.
The caller will receive immediately the `taskListID` as response.
The executor then takes a task list from the queue and executes the groups within the task list sequentially.
Each group has a field called `execution` which can be one of two options: `parallel` or `sequential`. This
field describes how the tasks within the group *must* be executed.
- Sequential group execution: tasks within the group are executed sequentially and the result of each task is passed as
an input to the next. If one task fails to execute, all following tasks are marked as failed and the whole group fails.
- Parallel group execution: tasks within the group are executed in parallel and the results are dependant. If a task
fails to execute, this does not affect the other tasks but the group is marked with failed status.
The state of the task list asynchronous execution is available later on the `result` endpoint:
```shell
curl -v -X GET http://localhost:8082/v1/taskListResult/{taskListID}
```
### Task list Executor Configuration
There are two environment variables that control the level of concurrency
of the task list executor.
```shell
LIST_EXECUTOR_WORKERS="5"
LIST_EXECUTOR_POLL_INTERVAL="1s"
```
Poll interval specifies how often the executor will try to fetch a *pending* task list
from the queue. After a task list is fetched, it is executed in parallel with other executions.
Number of workers is the limit of permitted parallel task lists executions.
With the given example of 1 second (1s), the executor will retrieve 1 task list per second at most.
If there are multiple instances (pods) of the service, multiply by their number
(e.g. 5 pods = 5 tasks per second).
If this is not enough, the poll interval can be decreased, or we can slightly modify
the polling function to fetch many task lists at once (and also increase the number of workers).
To learn more about the queue and why we use database as queue see [queue](queue.md)
### Task List Storage
We use MongoDB for task list storage. There are three Mongo collections with different purpose.
1. **taskListTemplates**
The collection contains predefined task list definitions in JSON format. Each definition contains
groups of tasks which must be instantiated and later executed as part of the task list.
2. **taskLists**
The collection contains newly created task lists *pending* for execution. It acts like a
FIFO queue and is used by the task list executor to retrieve task lists for workers to execute.
3. **tasks**
The collection contains the tasks belonging to a group which is part of a task list. When a task list
is fetched for execution, all tasks are fetched and executed for that particular task list.
4. **tasksListHistory**
The collection contains completed task lists for results querying,
audit, reporting and debugging purposes.
# Task Service - Task Documentation
### Task Definition
A tasks is described in JSON format and base task (template) definition *must* be available
before a task can be created. Task JSON templates are stored in Mongo database in a collection
named `taskTemplates`. Some task properties are statically predefined in the template,
others are populated dynamically by the input request and the output response. Below is
an example of task template definition:
```json
{
"name":"exampleTask",
"url":"https://jsonplaceholder.typicode.com/todos/1",
"method":"GET",
"requestPolicy":"example/example/1.0",
"responsePolicy":"",
"finalPolicy":"",
"cacheNamespace":"login",
"cacheScope":"user"
}
```
Tasks are created by using their `name` attribute. If a task template with the given
`name` does not exist, a task will not be created and an error is returned.
### Task Execution
Below is an example of creating a task with the template definition given above:
```shell
curl -v -X POST http://localhost:8082/v1/task/exampleTask -d '{"exampleInput":{"test":123}}'
```
The HTTP request will create a task for asynchronous execution and the JSON object
given as input will be used as the body of the task request. The caller will receive
immediately the `taskID` as response, and the result of the asynchronous task
execution will be stored in the TSA Cache after the task is completed.
### Task Executor Configuration
There are two environment variables that control the level of concurrency
of the task executor.
```shell
EXECUTOR_WORKERS="5"
EXECUTOR_POLL_INTERVAL="1s"
```
Poll interval specifies how often the executor will try to fetch a *pending* task
from the queue. After a task is fetched, it is given to one of the workers for execution.
With the given example of 1 second (1s), the executor will retrieve 1 task per second at most.
If there are multiple instances (pods) of the service, multiply by their number
(e.g. 5 pods = 5 tasks per second).
If this is not enough, the poll interval can be decreased, or we can slightly modify
the polling function to fetch many tasks at once (and also increase the number of workers).
To learn more about the queue and why we use database as queue see [queue](queue.md)
### Task Storage
We use MongoDB for tasks storage. There are three Mongo collections with different purpose.
1. **taskTemplates**
The collection contains predefined task definitions in JSON format. Here are defined
what tasks can be created and executed by the service.
2. **tasks**
The collection contains newly created tasks *pending* for execution. It acts like a
FIFO queue and is used by the task executor to retrieve tasks for workers to execute.
3. **tasksHistory**
The collection contains successfully completed tasks for results querying,
audit, reporting and debugging purposes.
......@@ -38,7 +38,7 @@ type executorConfig struct {
type listExecutorConfig struct {
Workers int `envconfig:"LIST_EXECUTOR_WORKERS" default:"5"`
PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"`
PollInterval time.Duration `envconfig:"LIST_EXECUTOR_POLL_INTERVAL" default:"1s"`
}
type cacheConfig struct {
......
......@@ -190,11 +190,8 @@ func (l *ListExecutor) executeSequential(ctx context.Context, group *tasklist.Gr
}
logger.Debug("task execution completed successfully")
// pass the response from current task to the next task
// pass the response from current task as an input to the next task
req = task.Response
if bytes.Compare(task.Response, []byte(nil)) == 0 {
req = []byte{}
}
if err := l.cache.Set(
ctx,
......
......@@ -113,7 +113,6 @@ func createGroups(t *Template, req []byte) []Group {
Execution: group.Execution,
Tasks: group.Tasks,
State: task.Created,
Metadata: group.Metadata,
Request: req,
FinalPolicy: group.FinalPolicy,
}
......
......@@ -14,10 +14,9 @@ type Template struct {
}
type GroupTemplate struct {
Execution string `json:"execution"`
FinalPolicy string `json:"finalPolicy"`
Tasks []string `json:"tasks"`
Metadata interface{} `json:"metadata"`
Execution string `json:"execution"`
FinalPolicy string `json:"finalPolicy"`
Tasks []string `json:"tasks"`
}
type TaskList struct {
......@@ -34,11 +33,10 @@ type TaskList struct {
}
type Group struct {
ID string `json:"id"`
Execution string `json:"execution"`
Tasks []string `json:"tasks"`
State task.State `json:"state"`
Request []byte `json:"request"`
Metadata interface{} `json:"metadata"` // TODO(penkovski): not yet clear
FinalPolicy string `json:"finalPolicy"`
ID string `json:"id"`
Execution string `json:"execution"`
Tasks []string `json:"tasks"`
State task.State `json:"state"`
Request []byte `json:"request"`
FinalPolicy string `json:"finalPolicy"`
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment