Skip to content
Snippets Groups Projects
Yordan Kinkov's avatar
#13 WIP: sequential taskList executor
Yordan Kinkov authored
066e96f4

pipeline status coverage report

Task Service

The task service provides an HTTP interface for executing asynchronous (HTTP) tasks.

It is developed using the Goa v3 framework.

While the service is up and running, you can see a live Swagger API description at servicehost:serviceport/swagger-ui. In the local docker-compose environment, the Swagger URL is available at http://localhost:8082/swagger-ui/

High-level Overview

flowchart LR  
	A([client]) -- HTTP --> B[Task API] 
	subgraph task 
		B --> C[(tasks DB)] 
		C --> D[Executor] 
	end 
	D --> E[Policy]
	D --> F[Cache]

Tasks are created by clients making HTTP requests. The newly created tasks are stored in a persistent database which is used like Queue. An executor component is retrieving tasks from the Queue for execution. Clients receive an immediate response with the taskID for the created task and can later query the state of task and retrieve its result either by directly querying the Cache service, or by querying the task HTTP interface for task results.

Task Definition

A tasks is described in JSON format and base task (template) definition must be available before a task can be created. Task JSON templates are stored in Mongo database in a collection named taskTemplates. Some task properties are statically predefined in the template, others are populated dynamically by the input request and the output response. Below is an example of task template definition:

{
    "name":"exampleTask",
    "url":"https://jsonplaceholder.typicode.com/todos/1",
    "method":"GET",
    "requestPolicy":"example/example/1.0",
    "responsePolicy":"",
    "finalPolicy":"",
    "cacheNamespace":"login",
    "cacheScope":"user"
}

Tasks are created by using their name attribute. If a task template with the given name does not exist, a task will not be created and an error is returned.

Task Execution

Below is an example of creating a task with the template definition given above:

curl -v -X POST http://localhost:8082/v1/task/exampleTask -d '{"exampleInput":{"test":123}}'

The HTTP request will create a task for asynchronous execution and the JSON object given as input will be used as the body of the task request. The caller will receive immediately the taskID as response, and the result of the asynchronous task execution will be stored in the TSA Cache after the task is completed.

Task Executor Configuration

There are two environment variables that control the level of concurrency of the task executor.

EXECUTOR_WORKERS="5"
EXECUTOR_POLL_INTERVAL="1s"

Poll interval specifies how often the executor will try to fetch a pending task from the queue. After a task is fetched, it is given to one of the workers for execution. With the given example of 1 second (1s), the executor will retrieve 1 task per second at most. If there are multiple instances (pods) of the service, multiply by their number (e.g. 5 pods = 5 tasks per second).

If this is not enough, the poll interval can be decreased, or we can slightly modify the polling function to fetch many tasks at once (and also increase the number of workers).

That leads us to the next section and a question.

Why the Queue is Database

Why we decided to use a database as queue instead of a universal message queue product like Kafka, so that the executor won't need to poll for new tasks, but will instead receive them as they come?

  1. The TSA requirements document describes a functionality for task groups. These are groups of tasks which may be executed sequentially with later tasks in the group depending on the results of the previous tasks in the group. This means that we can't just execute a task when an event for its creation has arrived. We have to keep persistent execution state through multiple task executions and multiple service instances (e.g. pods/executors) for which a database seems like more natural choice.

  2. Tasks are not just simple messages, but contain state. The state may change during the lifecycle and execution progress of the task. The state must also be persistent, auditable and should be available for querying by clients. A database seems more suitable to us for implementing these features than a simple delivery message queue.

The downside of our current approach is that the database is constantly polled by the executor for new tasks. In practice this should not be a problem, because the task collection containing pending tasks for execution should contain very small number of records. Even at peak load, it should contain a few hundred tasks at most, because after tasks are executed they are removed from the queue collection. We expect that the queue collection will be empty most of the time and even when it isn't, there won't be many records inside. Practically speaking, the query to retrieve tasks for execution should be very fast and light in terms of database load.

The benefits of this approach are that it's simple to implement and reason about.

If you have better ideas, or these arguments sound strange, please get in touch with us and we'll consider other options and improvements to the current model.

Task Storage

We use MongoDB for tasks storage. There are three Mongo collections with different purpose.

  1. taskTemplates

The collection contains predefined task definitions in JSON format. Here are defined what tasks can be created and executed by the service.

  1. tasks

The collection contains newly created tasks pending for execution. It acts like a FIFO queue and is used by the task executor to retrieve tasks for workers to execute.

  1. tasksHistory

The collection contains successfully completed tasks for results querying, audit, reporting and debugging purposes.

Tests and Linters

To execute the units tests for the service go to the root project directory and run:

go test -race ./...

To run the linters go to the root project directory and run:

golangci-lint run

Dependencies and Vendor

The project uses Go modules for managing dependencies, and we commit the vendor directory. When you add/change dependencies, be sure to clean and update the vendor directory before submitting your Merge Request for review.

go mod tidy
go mod vendor