diff --git a/README.md b/README.md index 9e3091872a7adb3dc1855c44a36c0a144dc5280b..c5e80af9f480c29ae049f9768b25ded7cd3335d4 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ # Task Service -The task service provides an HTTP interface for executing asynchronous (HTTP) tasks. +The task service provides an HTTP interface for executing asynchronous (HTTP) tasks and task lists. It is developed using the [Goa v3](https://goa.design/) framework. @@ -32,113 +32,14 @@ immediate response with the `taskID` for the created task and can later query the state of task and retrieve its result either by directly querying the Cache service, or by querying the task HTTP interface for task results. -### Task Definition - -A tasks is described in JSON format and base task (template) definition *must* be available -before a task can be created. Task JSON templates are stored in Mongo database in a collection -named `taskTemplates`. Some task properties are statically predefined in the template, -others are populated dynamically by the input request and the output response. Below is -an example of task template definition: - -```json -{ - "name":"exampleTask", - "url":"https://jsonplaceholder.typicode.com/todos/1", - "method":"GET", - "requestPolicy":"example/example/1.0", - "responsePolicy":"", - "finalPolicy":"", - "cacheNamespace":"login", - "cacheScope":"user" -} -``` - -Tasks are created by using their `name` attribute. If a task template with the given -`name` does not exist, a task will not be created and an error is returned. - -### Task Execution - -Below is an example of creating a task with the template definition given above: -```shell -curl -v -X POST http://localhost:8082/v1/task/exampleTask -d '{"exampleInput":{"test":123}}' -``` - -The HTTP request will create a task for asynchronous execution and the JSON object -given as input will be used as the body of the task request. The caller will receive -immediately the `taskID` as response, and the result of the asynchronous task -execution will be stored in the TSA Cache after the task is completed. - -### Task Executor Configuration - -There are two environment variables that control the level of concurrency -of the task executor. - -```shell -EXECUTOR_WORKERS="5" -EXECUTOR_POLL_INTERVAL="1s" -``` - -Poll interval specifies how often the executor will try to fetch a *pending* task -from the queue. After a task is fetched, it is given to one of the workers for execution. -With the given example of 1 second (1s), the executor will retrieve 1 task per second at most. -If there are multiple instances (pods) of the service, multiply by their number -(e.g. 5 pods = 5 tasks per second). - -If this is not enough, the poll interval can be decreased, or we can slightly modify -the polling function to fetch many tasks at once (and also increase the number of workers). - -That leads us to the next section and a question. - -### Why the Queue is Database - -Why we decided to use a database as queue instead of a universal message queue product -like Kafka, so that the executor won't need to poll for new tasks, but will instead -receive them as they come? - -1. The TSA requirements document describes a functionality for task groups. -These are groups of tasks which may be executed sequentially with later tasks in the -group depending on the results of the previous tasks in the group. This means that we -can't just execute a task when an event for its creation has arrived. We have to keep -persistent execution state through multiple task executions and multiple service instances -(e.g. pods/executors) for which a database seems like more natural choice. - -2. Tasks are not just simple messages, but contain state. The state may change during -the lifecycle and execution progress of the task. The state must also be persistent, -auditable and should be available for querying by clients. A database seems more suitable -to us for implementing these features than a simple delivery message queue. - -The downside of our current approach is that the database is constantly polled by the -executor for new tasks. In practice this should not be a problem, because the task collection -containing *pending* tasks for execution should contain very small number of records. -Even at peak load, it should contain a few hundred tasks at most, because after tasks -are executed they are removed from the queue collection. We expect that the queue collection -will be empty most of the time and even when it isn't, there won't be many records inside. -Practically speaking, the query to retrieve tasks for execution should be very fast and -light in terms of database load. - -The benefits of this approach are that it's simple to implement and reason about. - -> If you have better ideas, or these arguments sound strange, please get in touch with us -> and we'll consider other options and improvements to the current model. - -### Task Storage - -We use MongoDB for tasks storage. There are three Mongo collections with different purpose. - -1. **taskTemplates** - -The collection contains predefined task definitions in JSON format. Here are defined -what tasks can be created and executed by the service. +### More information +* [Tasks](docs/task.md) +* [Task lists](docs/task-list.md) +* [Queue](docs/queue.md) -2. **tasks** -The collection contains newly created tasks *pending* for execution. It acts like a -FIFO queue and is used by the task executor to retrieve tasks for workers to execute. -3. **tasksHistory** -The collection contains successfully completed tasks for results querying, -audit, reporting and debugging purposes. ### Tests and Linters diff --git a/cmd/task/main.go b/cmd/task/main.go index e31f03b83bafe6af209aa2a8d75d008fe0418428..d7853699eacdd6406531057c77fe0b3c8670c9c7 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -22,15 +22,19 @@ import ( goahealthsrv "code.vereign.com/gaiax/tsa/task/gen/http/health/server" goaopenapisrv "code.vereign.com/gaiax/tsa/task/gen/http/openapi/server" goatasksrv "code.vereign.com/gaiax/tsa/task/gen/http/task/server" + goatasklistsrv "code.vereign.com/gaiax/tsa/task/gen/http/task_list/server" "code.vereign.com/gaiax/tsa/task/gen/openapi" goatask "code.vereign.com/gaiax/tsa/task/gen/task" + goatasklist "code.vereign.com/gaiax/tsa/task/gen/task_list" "code.vereign.com/gaiax/tsa/task/internal/clients/cache" "code.vereign.com/gaiax/tsa/task/internal/clients/policy" "code.vereign.com/gaiax/tsa/task/internal/config" "code.vereign.com/gaiax/tsa/task/internal/executor" + "code.vereign.com/gaiax/tsa/task/internal/listexecutor" "code.vereign.com/gaiax/tsa/task/internal/service" "code.vereign.com/gaiax/tsa/task/internal/service/health" "code.vereign.com/gaiax/tsa/task/internal/service/task" + "code.vereign.com/gaiax/tsa/task/internal/service/tasklist" "code.vereign.com/gaiax/tsa/task/internal/storage" ) @@ -84,24 +88,39 @@ func main() { logger, ) + listExecutor := listexecutor.New( + storage, + policy, + storage, + cache, + cfg.ListExecutor.Workers, + cfg.ListExecutor.PollInterval, + httpClient(), + logger, + ) + // create services var ( - taskSvc goatask.Service - healthSvc goahealth.Service + taskSvc goatask.Service + taskListSvc goatasklist.Service + healthSvc goahealth.Service ) { taskSvc = task.New(storage, storage, cache, logger) + taskListSvc = tasklist.New(storage, storage, logger) healthSvc = health.New() } // create endpoints var ( - taskEndpoints *goatask.Endpoints - healthEndpoints *goahealth.Endpoints - openapiEndpoints *openapi.Endpoints + taskEndpoints *goatask.Endpoints + taskListEndpoints *goatasklist.Endpoints + healthEndpoints *goahealth.Endpoints + openapiEndpoints *openapi.Endpoints ) { taskEndpoints = goatask.NewEndpoints(taskSvc) + taskListEndpoints = goatasklist.NewEndpoints(taskListSvc) healthEndpoints = goahealth.NewEndpoints(healthSvc) openapiEndpoints = openapi.NewEndpoints(nil) } @@ -124,18 +143,21 @@ func main() { // the service input and output data structures to HTTP requests and // responses. var ( - taskServer *goatasksrv.Server - healthServer *goahealthsrv.Server - openapiServer *goaopenapisrv.Server + taskServer *goatasksrv.Server + taskListServer *goatasklistsrv.Server + healthServer *goahealthsrv.Server + openapiServer *goaopenapisrv.Server ) { taskServer = goatasksrv.New(taskEndpoints, mux, dec, enc, nil, errFormatter) + taskListServer = goatasklistsrv.New(taskListEndpoints, mux, dec, enc, nil, errFormatter) healthServer = goahealthsrv.New(healthEndpoints, mux, dec, enc, nil, errFormatter) openapiServer = goaopenapisrv.New(openapiEndpoints, mux, dec, enc, nil, errFormatter, nil, nil) } // Configure the mux. goatasksrv.Mount(mux, taskServer) + goatasklistsrv.Mount(mux, taskListServer) goahealthsrv.Mount(mux, healthServer) goaopenapisrv.Mount(mux, openapiServer) @@ -159,6 +181,9 @@ func main() { g.Go(func() error { return executor.Start(ctx) }) + g.Go(func() error { + return listExecutor.Start(ctx) + }) if err := g.Wait(); err != nil { logger.Error("run group stopped", zap.Error(err)) } diff --git a/design/design.go b/design/design.go index f78f0680296dcbe79ae5a2edd5e413d12446c117..19bce998ec88c06dcd99234fa87b4ca34afb399f 100644 --- a/design/design.go +++ b/design/design.go @@ -20,8 +20,8 @@ var _ = Service("task", func() { Method("Create", func() { Description("Create a task and put it in a queue for execution.") - Payload(CreateRequest) - Result(CreateResult) + Payload(CreateTaskRequest) + Result(CreateTaskResult) HTTP(func() { POST("/v1/task/{taskName}") @@ -49,6 +49,30 @@ var _ = Service("task", func() { }) }) +var _ = Service("taskList", func() { + Description("TaskList service provides endpoints to work with task lists.") + + Method("Create", func() { + Description("Create a task list and corresponding tasks and put them in respective queues for execution.") + Payload(CreateTaskListRequest) + Result(CreateTaskListResult) + HTTP(func() { + POST("/v1/taskList/{taskListName}") + + Header("cacheNamespace:x-cache-namespace", String, "Cache key namespace", func() { + Example("login") + }) + Header("cacheScope:x-cache-scope", String, "Cache key scope", func() { + Example("user") + }) + + Body("data") + + Response(StatusOK) + }) + }) +}) + var _ = Service("health", func() { Description("Health service provides health check endpoints.") diff --git a/design/types.go b/design/types.go index 7625fe78437e5f08fb4a3a4c2c9cce2f699b316f..7178a145f82034751203139cd631fcca56bdbf0d 100644 --- a/design/types.go +++ b/design/types.go @@ -3,7 +3,7 @@ package design import . "goa.design/goa/v3/dsl" -var CreateRequest = Type("CreateRequest", func() { +var CreateTaskRequest = Type("CreateTaskRequest", func() { Field(1, "taskName", String, "Task name.") Field(2, "data", Any, "Data contains JSON payload that will be used for task execution.") Field(3, "cacheNamespace", String, "Cache key namespace.") @@ -11,7 +11,7 @@ var CreateRequest = Type("CreateRequest", func() { Required("taskName", "data") }) -var CreateResult = Type("CreateResult", func() { +var CreateTaskResult = Type("CreateTaskResult", func() { Field(1, "taskID", String, "Unique task identifier.") Required("taskID") }) @@ -20,3 +20,16 @@ var TaskResultRequest = Type("TaskResultRequest", func() { Field(1, "taskID", String, "Unique task identifier.") Required("taskID") }) + +var CreateTaskListRequest = Type("CreateTaskListRequest", func() { + Field(1, "taskListName", String, "TaskList name.") + Field(2, "data", Any, "Data contains JSON payload that will be used for taskList execution.") + Field(3, "cacheNamespace", String, "Cache key namespace.") + Field(4, "cacheScope", String, "Cache key scope.") + Required("taskListName", "data") +}) + +var CreateTaskListResult = Type("CreateTaskListResult", func() { + Field(1, "taskListID", String, "Unique taskList identifier.") + Required("taskListID") +}) diff --git a/docs/queue.md b/docs/queue.md new file mode 100644 index 0000000000000000000000000000000000000000..dc976358d41f23a9ce6cc0115040394c56bd8eb9 --- /dev/null +++ b/docs/queue.md @@ -0,0 +1,33 @@ +#Task service - Queue + +### Why the Queue is Database + +Why we decided to use a database as queue instead of a universal message queue product +like Kafka, so that the executor won't need to poll for new tasks, but will instead +receive them as they come? + +1. The TSA requirements document describes a functionality for task groups. + These are groups of tasks which may be executed sequentially with later tasks in the + group depending on the results of the previous tasks in the group. This means that we + can't just execute a task when an event for its creation has arrived. We have to keep + persistent execution state through multiple task executions and multiple service instances + (e.g. pods/executors) for which a database seems like more natural choice. + +2. Tasks are not just simple messages, but contain state. The state may change during + the lifecycle and execution progress of the task. The state must also be persistent, + auditable and should be available for querying by clients. A database seems more suitable + to us for implementing these features than a simple delivery message queue. + +The downside of our current approach is that the database is constantly polled by the +executor for new tasks. In practice this should not be a problem, because the task collection +containing *pending* tasks for execution should contain very small number of records. +Even at peak load, it should contain a few hundred tasks at most, because after tasks +are executed they are removed from the queue collection. We expect that the queue collection +will be empty most of the time and even when it isn't, there won't be many records inside. +Practically speaking, the query to retrieve tasks for execution should be very fast and +light in terms of database load. + +The benefits of this approach are that it's simple to implement and reason about. + +> If you have better ideas, or these arguments sound strange, please get in touch with us +> and we'll consider other options and improvements to the current model. diff --git a/docs/task-list.md b/docs/task-list.md new file mode 100644 index 0000000000000000000000000000000000000000..5b2b4fcf9e170d9923132004f31ebbb840d56376 --- /dev/null +++ b/docs/task-list.md @@ -0,0 +1,107 @@ +# Task service - Task list + +### Task list definition + +A task list is described in JSON format and base task list (template) definition *must* be available +before a task list can be created. Task list JSON templates are stored in Mongo database in a collection +named `taskListTemplates`. Below is an example of task list template definition: + +```json +{ + "name": "example", + "cacheNamespace": "login", + "cacheScope": "user", + "groups": [ + { + "execution": "sequential", + "tasks": [ + "taskName1", + "taskName2" + ] + }, + { + "execution": "parallel", + "tasks": [ + "taskName3", + "taskName4", + "taskName5" + ] + } + ] +} +``` + +Task lists are created by using their `name` attribute. Tasks for each group in the `groups` field +are also created by using their `name` so task names should match existing task template +definitions (see [tasks](task.md)). If a task list template with the given `name` does not exist OR a task +template definition does not exist, a task list will not be created and an error is returned. + +### Task list Execution + +Below is an example of creating a task list with the template definition given above: +```shell +curl -v -X POST http://localhost:8082/v1/taskList/example -d '{"input": {"key": "value"}}' +``` + +The HTTP request will create a task list and corresponding tasks for each of the groups within the task list +for asynchronous execution and the JSON object given as input will be used as the body of the task list request. +The caller will receive immediately the `taskListID` as response. + +The executor then takes a task list from the queue and executes the groups within the task list sequentially. +Each group has a field called `execution` which can be one of two options: `parallel` or `sequential`. This +field describes how the tasks within the group *must* be executed. + - Sequential group execution: tasks within the group are executed sequentially and the result of each task is passed as +an input to the next. If one task fails to execute, all following tasks are marked as failed and the whole group fails. + - Parallel group execution: tasks within the group are executed in parallel and the results are dependant. If a task +fails to execute, this does not affect the other tasks but the group is marked with failed status. + +The state of the task list asynchronous execution is available later on the `result` endpoint: +```shell +curl -v -X GET http://localhost:8082/v1/taskListResult/{taskListID} +``` + +### Task list Executor Configuration + +There are two environment variables that control the level of concurrency +of the task list executor. + +```shell +LIST_EXECUTOR_WORKERS="5" +LIST_EXECUTOR_POLL_INTERVAL="1s" +``` + +Poll interval specifies how often the executor will try to fetch a *pending* task list +from the queue. After a task list is fetched, it is executed in parallel with other executions. +Number of workers is the limit of permitted parallel task lists executions. +With the given example of 1 second (1s), the executor will retrieve 1 task list per second at most. +If there are multiple instances (pods) of the service, multiply by their number +(e.g. 5 pods = 5 tasks per second). + +If this is not enough, the poll interval can be decreased, or we can slightly modify +the polling function to fetch many task lists at once (and also increase the number of workers). + +To learn more about the queue and why we use database as queue see [queue](queue.md) + +### Task List Storage + +We use MongoDB for task list storage. There are three Mongo collections with different purpose. + +1. **taskListTemplates** + +The collection contains predefined task list definitions in JSON format. Each definition contains +groups of tasks which must be instantiated and later executed as part of the task list. + +2. **taskLists** + +The collection contains newly created task lists *pending* for execution. It acts like a +FIFO queue and is used by the task list executor to retrieve task lists for workers to execute. + +3. **tasks** + +The collection contains the tasks belonging to a group which is part of a task list. When a task list +is fetched for execution, all tasks are fetched and executed for that particular task list. + +4. **tasksListHistory** + +The collection contains completed task lists for results querying, +audit, reporting and debugging purposes. diff --git a/docs/task.md b/docs/task.md new file mode 100644 index 0000000000000000000000000000000000000000..c023af206b6bad0d539b1d9b2d1b47311b7846ef --- /dev/null +++ b/docs/task.md @@ -0,0 +1,77 @@ +# Task Service - Task Documentation + +### Task Definition + +A tasks is described in JSON format and base task (template) definition *must* be available +before a task can be created. Task JSON templates are stored in Mongo database in a collection +named `taskTemplates`. Some task properties are statically predefined in the template, +others are populated dynamically by the input request and the output response. Below is +an example of task template definition: + +```json +{ + "name":"exampleTask", + "url":"https://jsonplaceholder.typicode.com/todos/1", + "method":"GET", + "requestPolicy":"example/example/1.0", + "responsePolicy":"", + "finalPolicy":"", + "cacheNamespace":"login", + "cacheScope":"user" +} +``` + +Tasks are created by using their `name` attribute. If a task template with the given +`name` does not exist, a task will not be created and an error is returned. + +### Task Execution + +Below is an example of creating a task with the template definition given above: +```shell +curl -v -X POST http://localhost:8082/v1/task/exampleTask -d '{"exampleInput":{"test":123}}' +``` + +The HTTP request will create a task for asynchronous execution and the JSON object +given as input will be used as the body of the task request. The caller will receive +immediately the `taskID` as response, and the result of the asynchronous task +execution will be stored in the TSA Cache after the task is completed. + +### Task Executor Configuration + +There are two environment variables that control the level of concurrency +of the task executor. + +```shell +EXECUTOR_WORKERS="5" +EXECUTOR_POLL_INTERVAL="1s" +``` + +Poll interval specifies how often the executor will try to fetch a *pending* task +from the queue. After a task is fetched, it is given to one of the workers for execution. +With the given example of 1 second (1s), the executor will retrieve 1 task per second at most. +If there are multiple instances (pods) of the service, multiply by their number +(e.g. 5 pods = 5 tasks per second). + +If this is not enough, the poll interval can be decreased, or we can slightly modify +the polling function to fetch many tasks at once (and also increase the number of workers). + +To learn more about the queue and why we use database as queue see [queue](queue.md) + +### Task Storage + +We use MongoDB for tasks storage. There are three Mongo collections with different purpose. + +1. **taskTemplates** + +The collection contains predefined task definitions in JSON format. Here are defined +what tasks can be created and executed by the service. + +2. **tasks** + +The collection contains newly created tasks *pending* for execution. It acts like a +FIFO queue and is used by the task executor to retrieve tasks for workers to execute. + +3. **tasksHistory** + +The collection contains successfully completed tasks for results querying, +audit, reporting and debugging purposes. diff --git a/gen/http/cli/task/cli.go b/gen/http/cli/task/cli.go index ca12630d5df90424c74955045a70afd451dcb583..e125d9bae7407f0fabda3324cb9c3d1966b29ed4 100644 --- a/gen/http/cli/task/cli.go +++ b/gen/http/cli/task/cli.go @@ -15,6 +15,7 @@ import ( healthc "code.vereign.com/gaiax/tsa/task/gen/http/health/client" taskc "code.vereign.com/gaiax/tsa/task/gen/http/task/client" + tasklistc "code.vereign.com/gaiax/tsa/task/gen/http/task_list/client" goahttp "goa.design/goa/v3/http" goa "goa.design/goa/v3/pkg" ) @@ -26,13 +27,15 @@ import ( func UsageCommands() string { return `health (liveness|readiness) task (create|task-result) +task-list create ` } // UsageExamples produces an example of a valid invocation of the CLI tool. func UsageExamples() string { return os.Args[0] + ` health liveness` + "\n" + - os.Args[0] + ` task create --body "Eaque excepturi suscipit veritatis nemo." --task-name "Corrupti facere sequi tempora eius assumenda molestiae." --cache-namespace "Laborum sapiente." --cache-scope "Nam et."` + "\n" + + os.Args[0] + ` task create --body "Ipsam et est accusantium." --task-name "Corrupti quia autem dolorum sunt aperiam quaerat." --cache-namespace "Eveniet et eligendi sint quibusdam quia maxime." --cache-scope "Et ipsa voluptate."` + "\n" + + os.Args[0] + ` task-list create --body "Rerum quod error est esse nisi." --task-list-name "Sapiente et." --cache-namespace "Delectus natus eos." --cache-scope "Quae ut dolores ab."` + "\n" + "" } @@ -62,6 +65,14 @@ func ParseEndpoint( taskTaskResultFlags = flag.NewFlagSet("task-result", flag.ExitOnError) taskTaskResultTaskIDFlag = taskTaskResultFlags.String("task-id", "REQUIRED", "Unique task identifier.") + + taskListFlags = flag.NewFlagSet("task-list", flag.ContinueOnError) + + taskListCreateFlags = flag.NewFlagSet("create", flag.ExitOnError) + taskListCreateBodyFlag = taskListCreateFlags.String("body", "REQUIRED", "") + taskListCreateTaskListNameFlag = taskListCreateFlags.String("task-list-name", "REQUIRED", "TaskList name.") + taskListCreateCacheNamespaceFlag = taskListCreateFlags.String("cache-namespace", "", "") + taskListCreateCacheScopeFlag = taskListCreateFlags.String("cache-scope", "", "") ) healthFlags.Usage = healthUsage healthLivenessFlags.Usage = healthLivenessUsage @@ -71,6 +82,9 @@ func ParseEndpoint( taskCreateFlags.Usage = taskCreateUsage taskTaskResultFlags.Usage = taskTaskResultUsage + taskListFlags.Usage = taskListUsage + taskListCreateFlags.Usage = taskListCreateUsage + if err := flag.CommandLine.Parse(os.Args[1:]); err != nil { return nil, nil, err } @@ -90,6 +104,8 @@ func ParseEndpoint( svcf = healthFlags case "task": svcf = taskFlags + case "task-list": + svcf = taskListFlags default: return nil, nil, fmt.Errorf("unknown service %q", svcn) } @@ -125,6 +141,13 @@ func ParseEndpoint( } + case "task-list": + switch epn { + case "create": + epf = taskListCreateFlags + + } + } } if epf == nil { @@ -165,6 +188,13 @@ func ParseEndpoint( endpoint = c.TaskResult() data, err = taskc.BuildTaskResultPayload(*taskTaskResultTaskIDFlag) } + case "task-list": + c := tasklistc.NewClient(scheme, host, doer, enc, dec, restore) + switch epn { + case "create": + endpoint = c.Create() + data, err = tasklistc.BuildCreatePayload(*taskListCreateBodyFlag, *taskListCreateTaskListNameFlag, *taskListCreateCacheNamespaceFlag, *taskListCreateCacheScopeFlag) + } } } if err != nil { @@ -232,7 +262,7 @@ Create a task and put it in a queue for execution. -cache-scope STRING: Example: - %[1]s task create --body "Eaque excepturi suscipit veritatis nemo." --task-name "Corrupti facere sequi tempora eius assumenda molestiae." --cache-namespace "Laborum sapiente." --cache-scope "Nam et." + %[1]s task create --body "Ipsam et est accusantium." --task-name "Corrupti quia autem dolorum sunt aperiam quaerat." --cache-namespace "Eveniet et eligendi sint quibusdam quia maxime." --cache-scope "Et ipsa voluptate." `, os.Args[0]) } @@ -243,6 +273,34 @@ TaskResult retrieves task result from the Cache service. -task-id STRING: Unique task identifier. Example: - %[1]s task task-result --task-id "Et ipsa voluptate." + %[1]s task task-result --task-id "Excepturi aut consequatur animi rerum." +`, os.Args[0]) +} + +// task-listUsage displays the usage of the task-list command and its +// subcommands. +func taskListUsage() { + fmt.Fprintf(os.Stderr, `TaskList service provides endpoints to work with task lists. +Usage: + %[1]s [globalflags] task-list COMMAND [flags] + +COMMAND: + create: Create a task list, corresponding groups and tasks and put them in respective queues for execution. + +Additional help: + %[1]s task-list COMMAND --help +`, os.Args[0]) +} +func taskListCreateUsage() { + fmt.Fprintf(os.Stderr, `%[1]s [flags] task-list create -body JSON -task-list-name STRING -cache-namespace STRING -cache-scope STRING + +Create a task list, corresponding groups and tasks and put them in respective queues for execution. + -body JSON: + -task-list-name STRING: TaskList name. + -cache-namespace STRING: + -cache-scope STRING: + +Example: + %[1]s task-list create --body "Rerum quod error est esse nisi." --task-list-name "Sapiente et." --cache-namespace "Delectus natus eos." --cache-scope "Quae ut dolores ab." `, os.Args[0]) } diff --git a/gen/http/openapi.json b/gen/http/openapi.json index 78e36c4475d9a753284914cfcbc51172644ffe72..13e4956be85c69bb6664de4204a7dfbcf60f9db0 100644 --- a/gen/http/openapi.json +++ b/gen/http/openapi.json @@ -1 +1 @@ -{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","required":false,"type":"string"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","required":false,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}},"/v1/taskResult/{taskID}":{"get":{"tags":["task"],"summary":"TaskResult task","description":"TaskResult retrieves task result from the Cache service.","operationId":"task#TaskResult","parameters":[{"name":"taskID","in":"path","description":"Unique task identifier.","required":true,"type":"string"}],"responses":{"200":{"description":"OK response.","schema":{"type":"string","format":"binary"}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Vel odio et doloribus est quod laborum."}},"example":{"taskID":"Harum aut autem aliquam dolorem non soluta."},"required":["taskID"]}}} \ No newline at end of file +{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","required":false,"type":"string"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","required":false,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}},"/v1/taskList/{taskListName}":{"post":{"tags":["taskList"],"summary":"Create taskList","description":"Create a task list, corresponding groups and tasks and put them in respective queues for execution.","operationId":"taskList#Create","parameters":[{"name":"taskListName","in":"path","description":"TaskList name.","required":true,"type":"string"},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","required":false,"type":"string"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","required":false,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for taskList execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskListCreateResponseBody","required":["taskListID"]}}},"schemes":["http"]}},"/v1/taskResult/{taskID}":{"get":{"tags":["task"],"summary":"TaskResult task","description":"TaskResult retrieves task result from the Cache service.","operationId":"task#TaskResult","parameters":[{"name":"taskID","in":"path","description":"Unique task identifier.","required":true,"type":"string"}],"responses":{"200":{"description":"OK response.","schema":{"type":"string","format":"binary"}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Et officiis aut."}},"example":{"taskID":"Atque veritatis."},"required":["taskID"]},"TaskListCreateResponseBody":{"title":"TaskListCreateResponseBody","type":"object","properties":{"taskListID":{"type":"string","description":"Unique taskList identifier.","example":"Deserunt dolor et autem quidem fugiat sint."}},"example":{"taskListID":"Aut voluptas possimus quia aliquam sit."},"required":["taskListID"]}}} \ No newline at end of file diff --git a/gen/http/openapi.yaml b/gen/http/openapi.yaml index 1ccd3b61d33f7ff620ecc4d959899033d4d70ded..b9008d5a089c97a647cc5fb22c831bd7c86a9d92 100644 --- a/gen/http/openapi.yaml +++ b/gen/http/openapi.yaml @@ -74,6 +74,46 @@ paths: - taskID schemes: - http + /v1/taskList/{taskListName}: + post: + tags: + - taskList + summary: Create taskList + description: Create a task list, corresponding groups and tasks and put them + in respective queues for execution. + operationId: taskList#Create + parameters: + - name: taskListName + in: path + description: TaskList name. + required: true + type: string + - name: x-cache-namespace + in: header + description: Cache key namespace + required: false + type: string + - name: x-cache-scope + in: header + description: Cache key scope + required: false + type: string + - name: any + in: body + description: Data contains JSON payload that will be used for taskList execution. + required: true + schema: + type: string + format: binary + responses: + "200": + description: OK response. + schema: + $ref: '#/definitions/TaskListCreateResponseBody' + required: + - taskListID + schemes: + - http /v1/taskResult/{taskID}: get: tags: @@ -103,8 +143,20 @@ definitions: taskID: type: string description: Unique task identifier. - example: Vel odio et doloribus est quod laborum. + example: Et officiis aut. example: - taskID: Harum aut autem aliquam dolorem non soluta. + taskID: Atque veritatis. required: - taskID + TaskListCreateResponseBody: + title: TaskListCreateResponseBody + type: object + properties: + taskListID: + type: string + description: Unique taskList identifier. + example: Deserunt dolor et autem quidem fugiat sint. + example: + taskListID: Aut voluptas possimus quia aliquam sit. + required: + - taskListID diff --git a/gen/http/openapi3.json b/gen/http/openapi3.json index 926387963b08082fda67874bb159f6e86d4c8295..4a39db1b50f4e6ac9185165a43d3ae6c3b14c9f6 100644 --- a/gen/http/openapi3.json +++ b/gen/http/openapi3.json @@ -1 +1 @@ -{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Aut qui aut itaque et commodi vel."},"example":"Sapiente et."},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key namespace","example":"login"},"example":"login"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key scope","example":"user"},"example":"user"}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Excepturi aut consequatur animi rerum.","format":"binary"},"example":"Dolorem illo officiis ipsa impedit harum et."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Corrupti quia autem dolorum sunt aperiam quaerat."}}}}}}},"/v1/taskResult/{taskID}":{"get":{"tags":["task"],"summary":"TaskResult task","description":"TaskResult retrieves task result from the Cache service.","operationId":"task#TaskResult","parameters":[{"name":"taskID","in":"path","description":"Unique task identifier.","required":true,"schema":{"type":"string","description":"Unique task identifier.","example":"Delectus natus eos."},"example":"Quae ut dolores ab."}],"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"type":"string","example":"Illum quidem sapiente sed velit.","format":"binary"},"example":"Omnis commodi reiciendis eum non."}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Dolores atque error ab."}},"example":{"taskID":"Laboriosam perspiciatis vitae numquam."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} \ No newline at end of file +{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Voluptatem iure qui facilis aut."},"example":"Excepturi non."},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key namespace","example":"login"},"example":"login"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key scope","example":"user"},"example":"user"}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Impedit iste suscipit.","format":"binary"},"example":"Excepturi in ex ratione."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateTaskResult"},"example":{"taskID":"Vel odio et doloribus est quod laborum."}}}}}}},"/v1/taskList/{taskListName}":{"post":{"tags":["taskList"],"summary":"Create taskList","description":"Create a task list, corresponding groups and tasks and put them in respective queues for execution.","operationId":"taskList#Create","parameters":[{"name":"taskListName","in":"path","description":"TaskList name.","required":true,"schema":{"type":"string","description":"TaskList name.","example":"Incidunt autem eaque."},"example":"Fugit ut eius sint earum."},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key namespace","example":"login"},"example":"login"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key scope","example":"user"},"example":"user"}],"requestBody":{"description":"Data contains JSON payload that will be used for taskList execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for taskList execution.","example":"Reiciendis numquam.","format":"binary"},"example":"At consequatur nulla praesentium totam dolores voluptas."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateTaskListResult"},"example":{"taskListID":"Quaerat ut fugit voluptatem dolores deserunt in."}}}}}}},"/v1/taskResult/{taskID}":{"get":{"tags":["task"],"summary":"TaskResult task","description":"TaskResult retrieves task result from the Cache service.","operationId":"task#TaskResult","parameters":[{"name":"taskID","in":"path","description":"Unique task identifier.","required":true,"schema":{"type":"string","description":"Unique task identifier.","example":"Ut et est aut quae magnam."},"example":"Amet sapiente qui non."}],"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"type":"string","example":"Placeat molestias praesentium necessitatibus sed.","format":"binary"},"example":"Sint nulla."}}}}}}},"components":{"schemas":{"CreateTaskListResult":{"type":"object","properties":{"taskListID":{"type":"string","description":"Unique taskList identifier.","example":"Doloribus ullam voluptas quos aut tempore."}},"example":{"taskListID":"Porro perspiciatis qui vitae totam eligendi officiis."},"required":["taskListID"]},"CreateTaskResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Facilis distinctio asperiores ut architecto ducimus."}},"example":{"taskID":"Omnis et."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."},{"name":"taskList","description":"TaskList service provides endpoints to work with task lists."}]} \ No newline at end of file diff --git a/gen/http/openapi3.yaml b/gen/http/openapi3.yaml index 7822203e80727be6ad349a505c7520f5f53bf1be..3b234d0b2694c25ae7b41a466c4a52329a6b48d7 100644 --- a/gen/http/openapi3.yaml +++ b/gen/http/openapi3.yaml @@ -40,8 +40,8 @@ paths: schema: type: string description: Task name. - example: Aut qui aut itaque et commodi vel. - example: Sapiente et. + example: Voluptatem iure qui facilis aut. + example: Excepturi non. - name: x-cache-namespace in: header description: Cache key namespace @@ -68,18 +68,75 @@ paths: schema: type: string description: Data contains JSON payload that will be used for task execution. - example: Excepturi aut consequatur animi rerum. + example: Impedit iste suscipit. format: binary - example: Dolorem illo officiis ipsa impedit harum et. + example: Excepturi in ex ratione. responses: "200": description: OK response. content: application/json: schema: - $ref: '#/components/schemas/CreateResult' + $ref: '#/components/schemas/CreateTaskResult' example: - taskID: Corrupti quia autem dolorum sunt aperiam quaerat. + taskID: Vel odio et doloribus est quod laborum. + /v1/taskList/{taskListName}: + post: + tags: + - taskList + summary: Create taskList + description: Create a task list, corresponding groups and tasks and put them + in respective queues for execution. + operationId: taskList#Create + parameters: + - name: taskListName + in: path + description: TaskList name. + required: true + schema: + type: string + description: TaskList name. + example: Incidunt autem eaque. + example: Fugit ut eius sint earum. + - name: x-cache-namespace + in: header + description: Cache key namespace + allowEmptyValue: true + schema: + type: string + description: Cache key namespace + example: login + example: login + - name: x-cache-scope + in: header + description: Cache key scope + allowEmptyValue: true + schema: + type: string + description: Cache key scope + example: user + example: user + requestBody: + description: Data contains JSON payload that will be used for taskList execution. + required: true + content: + application/json: + schema: + type: string + description: Data contains JSON payload that will be used for taskList + execution. + example: Reiciendis numquam. + format: binary + example: At consequatur nulla praesentium totam dolores voluptas. + responses: + "200": + description: OK response. + content: + application/json: + schema: + $ref: '#/components/schemas/CreateTaskListResult' + example: + taskListID: Quaerat ut fugit voluptatem dolores deserunt in. /v1/taskResult/{taskID}: get: tags: @@ -95,8 +152,8 @@ paths: schema: type: string description: Unique task identifier. - example: Delectus natus eos. - example: Quae ut dolores ab. + example: Ut et est aut quae magnam. + example: Amet sapiente qui non. responses: "200": description: OK response. @@ -104,20 +161,31 @@ paths: application/json: schema: type: string - example: Illum quidem sapiente sed velit. + example: Placeat molestias praesentium necessitatibus sed. format: binary - example: Omnis commodi reiciendis eum non. + example: Sint nulla. components: schemas: - CreateResult: + CreateTaskListResult: + type: object + properties: + taskListID: + type: string + description: Unique taskList identifier. + example: Doloribus ullam voluptas quos aut tempore. + example: + taskListID: Porro perspiciatis qui vitae totam eligendi officiis. + required: + - taskListID + CreateTaskResult: type: object properties: taskID: type: string description: Unique task identifier. - example: Dolores atque error ab. + example: Facilis distinctio asperiores ut architecto ducimus. example: - taskID: Laboriosam perspiciatis vitae numquam. + taskID: Omnis et. required: - taskID tags: @@ -125,3 +193,5 @@ tags: description: Health service provides health check endpoints. - name: task description: Task service provides endpoints to work with tasks. +- name: taskList + description: TaskList service provides endpoints to work with task lists. diff --git a/gen/http/task/client/cli.go b/gen/http/task/client/cli.go index f4e478f9179ff782d6cf169ef51804e1f7d6eed9..3080e4ebc945b6f4f29af532b072a18c0ad3094c 100644 --- a/gen/http/task/client/cli.go +++ b/gen/http/task/client/cli.go @@ -16,13 +16,13 @@ import ( // BuildCreatePayload builds the payload for the task Create endpoint from CLI // flags. -func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string, taskCreateCacheNamespace string, taskCreateCacheScope string) (*task.CreateRequest, error) { +func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string, taskCreateCacheNamespace string, taskCreateCacheScope string) (*task.CreateTaskRequest, error) { var err error var body interface{} { err = json.Unmarshal([]byte(taskCreateBody), &body) if err != nil { - return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Eaque excepturi suscipit veritatis nemo.\"") + return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Ipsam et est accusantium.\"") } } var taskName string @@ -42,7 +42,7 @@ func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string, taskCr } } v := body - res := &task.CreateRequest{ + res := &task.CreateTaskRequest{ Data: v, } res.TaskName = taskName diff --git a/gen/http/task/client/encode_decode.go b/gen/http/task/client/encode_decode.go index a7ecbf5caea14083c7bcfdac4483920e08fa1835..1fd5f926ececb598eb85c0a940cc1090d0577690 100644 --- a/gen/http/task/client/encode_decode.go +++ b/gen/http/task/client/encode_decode.go @@ -25,9 +25,9 @@ func (c *Client) BuildCreateRequest(ctx context.Context, v interface{}) (*http.R taskName string ) { - p, ok := v.(*task.CreateRequest) + p, ok := v.(*task.CreateTaskRequest) if !ok { - return nil, goahttp.ErrInvalidType("task", "Create", "*task.CreateRequest", v) + return nil, goahttp.ErrInvalidType("task", "Create", "*task.CreateTaskRequest", v) } taskName = p.TaskName } @@ -47,9 +47,9 @@ func (c *Client) BuildCreateRequest(ctx context.Context, v interface{}) (*http.R // server. func EncodeCreateRequest(encoder func(*http.Request) goahttp.Encoder) func(*http.Request, interface{}) error { return func(req *http.Request, v interface{}) error { - p, ok := v.(*task.CreateRequest) + p, ok := v.(*task.CreateTaskRequest) if !ok { - return goahttp.ErrInvalidType("task", "Create", "*task.CreateRequest", v) + return goahttp.ErrInvalidType("task", "Create", "*task.CreateTaskRequest", v) } if p.CacheNamespace != nil { head := *p.CacheNamespace @@ -98,7 +98,7 @@ func DecodeCreateResponse(decoder func(*http.Response) goahttp.Decoder, restoreB if err != nil { return nil, goahttp.ErrValidationError("task", "Create", err) } - res := NewCreateResultOK(&body) + res := NewCreateTaskResultOK(&body) return res, nil default: body, _ := ioutil.ReadAll(resp.Body) diff --git a/gen/http/task/client/types.go b/gen/http/task/client/types.go index 7f5244d65480df67bf86504d92affb4d836124ae..2b4b14fee01e71ca55048514cafe37f945455e33 100644 --- a/gen/http/task/client/types.go +++ b/gen/http/task/client/types.go @@ -19,10 +19,10 @@ type CreateResponseBody struct { TaskID *string `form:"taskID,omitempty" json:"taskID,omitempty" xml:"taskID,omitempty"` } -// NewCreateResultOK builds a "task" service "Create" endpoint result from a -// HTTP "OK" response. -func NewCreateResultOK(body *CreateResponseBody) *task.CreateResult { - v := &task.CreateResult{ +// NewCreateTaskResultOK builds a "task" service "Create" endpoint result from +// a HTTP "OK" response. +func NewCreateTaskResultOK(body *CreateResponseBody) *task.CreateTaskResult { + v := &task.CreateTaskResult{ TaskID: *body.TaskID, } diff --git a/gen/http/task/server/encode_decode.go b/gen/http/task/server/encode_decode.go index 716de0ada1b13cec9bb4401ef6e06a8ec19b3a26..309b093acd6f4ed65e7126e9d0dee1af70c33574 100644 --- a/gen/http/task/server/encode_decode.go +++ b/gen/http/task/server/encode_decode.go @@ -21,7 +21,7 @@ import ( // Create endpoint. func EncodeCreateResponse(encoder func(context.Context, http.ResponseWriter) goahttp.Encoder) func(context.Context, http.ResponseWriter, interface{}) error { return func(ctx context.Context, w http.ResponseWriter, v interface{}) error { - res, _ := v.(*task.CreateResult) + res, _ := v.(*task.CreateTaskResult) enc := encoder(ctx, w) body := NewCreateResponseBody(res) w.WriteHeader(http.StatusOK) @@ -61,7 +61,7 @@ func DecodeCreateRequest(mux goahttp.Muxer, decoder func(*http.Request) goahttp. if cacheScopeRaw != "" { cacheScope = &cacheScopeRaw } - payload := NewCreateRequest(body, taskName, cacheNamespace, cacheScope) + payload := NewCreateTaskRequest(body, taskName, cacheNamespace, cacheScope) return payload, nil } diff --git a/gen/http/task/server/types.go b/gen/http/task/server/types.go index c1482c9a8bc09f8781fd87e399293afa5c4392c6..b5a3db61fe4375f8ab37e32ec2d46a279144f211 100644 --- a/gen/http/task/server/types.go +++ b/gen/http/task/server/types.go @@ -20,17 +20,17 @@ type CreateResponseBody struct { // NewCreateResponseBody builds the HTTP response body from the result of the // "Create" endpoint of the "task" service. -func NewCreateResponseBody(res *task.CreateResult) *CreateResponseBody { +func NewCreateResponseBody(res *task.CreateTaskResult) *CreateResponseBody { body := &CreateResponseBody{ TaskID: res.TaskID, } return body } -// NewCreateRequest builds a task service Create endpoint payload. -func NewCreateRequest(body interface{}, taskName string, cacheNamespace *string, cacheScope *string) *task.CreateRequest { +// NewCreateTaskRequest builds a task service Create endpoint payload. +func NewCreateTaskRequest(body interface{}, taskName string, cacheNamespace *string, cacheScope *string) *task.CreateTaskRequest { v := body - res := &task.CreateRequest{ + res := &task.CreateTaskRequest{ Data: v, } res.TaskName = taskName diff --git a/gen/http/task_list/client/cli.go b/gen/http/task_list/client/cli.go new file mode 100644 index 0000000000000000000000000000000000000000..a46f7e6b65abf426b31ebafbdd65df0c52b216a8 --- /dev/null +++ b/gen/http/task_list/client/cli.go @@ -0,0 +1,53 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// taskList HTTP client CLI support package +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package client + +import ( + "encoding/json" + "fmt" + + tasklist "code.vereign.com/gaiax/tsa/task/gen/task_list" +) + +// BuildCreatePayload builds the payload for the taskList Create endpoint from +// CLI flags. +func BuildCreatePayload(taskListCreateBody string, taskListCreateTaskListName string, taskListCreateCacheNamespace string, taskListCreateCacheScope string) (*tasklist.CreateTaskListRequest, error) { + var err error + var body interface{} + { + err = json.Unmarshal([]byte(taskListCreateBody), &body) + if err != nil { + return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Rerum quod error est esse nisi.\"") + } + } + var taskListName string + { + taskListName = taskListCreateTaskListName + } + var cacheNamespace *string + { + if taskListCreateCacheNamespace != "" { + cacheNamespace = &taskListCreateCacheNamespace + } + } + var cacheScope *string + { + if taskListCreateCacheScope != "" { + cacheScope = &taskListCreateCacheScope + } + } + v := body + res := &tasklist.CreateTaskListRequest{ + Data: v, + } + res.TaskListName = taskListName + res.CacheNamespace = cacheNamespace + res.CacheScope = cacheScope + + return res, nil +} diff --git a/gen/http/task_list/client/client.go b/gen/http/task_list/client/client.go new file mode 100644 index 0000000000000000000000000000000000000000..7beb029b37d637bc6fc7ca2249ee7a84e2d100fe --- /dev/null +++ b/gen/http/task_list/client/client.go @@ -0,0 +1,74 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// taskList client HTTP transport +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package client + +import ( + "context" + "net/http" + + goahttp "goa.design/goa/v3/http" + goa "goa.design/goa/v3/pkg" +) + +// Client lists the taskList service endpoint HTTP clients. +type Client struct { + // Create Doer is the HTTP client used to make requests to the Create endpoint. + CreateDoer goahttp.Doer + + // RestoreResponseBody controls whether the response bodies are reset after + // decoding so they can be read again. + RestoreResponseBody bool + + scheme string + host string + encoder func(*http.Request) goahttp.Encoder + decoder func(*http.Response) goahttp.Decoder +} + +// NewClient instantiates HTTP clients for all the taskList service servers. +func NewClient( + scheme string, + host string, + doer goahttp.Doer, + enc func(*http.Request) goahttp.Encoder, + dec func(*http.Response) goahttp.Decoder, + restoreBody bool, +) *Client { + return &Client{ + CreateDoer: doer, + RestoreResponseBody: restoreBody, + scheme: scheme, + host: host, + decoder: dec, + encoder: enc, + } +} + +// Create returns an endpoint that makes HTTP requests to the taskList service +// Create server. +func (c *Client) Create() goa.Endpoint { + var ( + encodeRequest = EncodeCreateRequest(c.encoder) + decodeResponse = DecodeCreateResponse(c.decoder, c.RestoreResponseBody) + ) + return func(ctx context.Context, v interface{}) (interface{}, error) { + req, err := c.BuildCreateRequest(ctx, v) + if err != nil { + return nil, err + } + err = encodeRequest(req, v) + if err != nil { + return nil, err + } + resp, err := c.CreateDoer.Do(req) + if err != nil { + return nil, goahttp.ErrRequestError("taskList", "Create", err) + } + return decodeResponse(resp) + } +} diff --git a/gen/http/task_list/client/encode_decode.go b/gen/http/task_list/client/encode_decode.go new file mode 100644 index 0000000000000000000000000000000000000000..a8c5892c07ad6acdd709ff1f6875b1138eba762a --- /dev/null +++ b/gen/http/task_list/client/encode_decode.go @@ -0,0 +1,108 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// taskList HTTP client encoders and decoders +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package client + +import ( + "bytes" + "context" + "io/ioutil" + "net/http" + "net/url" + + tasklist "code.vereign.com/gaiax/tsa/task/gen/task_list" + goahttp "goa.design/goa/v3/http" +) + +// BuildCreateRequest instantiates a HTTP request object with method and path +// set to call the "taskList" service "Create" endpoint +func (c *Client) BuildCreateRequest(ctx context.Context, v interface{}) (*http.Request, error) { + var ( + taskListName string + ) + { + p, ok := v.(*tasklist.CreateTaskListRequest) + if !ok { + return nil, goahttp.ErrInvalidType("taskList", "Create", "*tasklist.CreateTaskListRequest", v) + } + taskListName = p.TaskListName + } + u := &url.URL{Scheme: c.scheme, Host: c.host, Path: CreateTaskListPath(taskListName)} + req, err := http.NewRequest("POST", u.String(), nil) + if err != nil { + return nil, goahttp.ErrInvalidURL("taskList", "Create", u.String(), err) + } + if ctx != nil { + req = req.WithContext(ctx) + } + + return req, nil +} + +// EncodeCreateRequest returns an encoder for requests sent to the taskList +// Create server. +func EncodeCreateRequest(encoder func(*http.Request) goahttp.Encoder) func(*http.Request, interface{}) error { + return func(req *http.Request, v interface{}) error { + p, ok := v.(*tasklist.CreateTaskListRequest) + if !ok { + return goahttp.ErrInvalidType("taskList", "Create", "*tasklist.CreateTaskListRequest", v) + } + if p.CacheNamespace != nil { + head := *p.CacheNamespace + req.Header.Set("x-cache-namespace", head) + } + if p.CacheScope != nil { + head := *p.CacheScope + req.Header.Set("x-cache-scope", head) + } + body := p.Data + if err := encoder(req).Encode(&body); err != nil { + return goahttp.ErrEncodingError("taskList", "Create", err) + } + return nil + } +} + +// DecodeCreateResponse returns a decoder for responses returned by the +// taskList Create endpoint. restoreBody controls whether the response body +// should be restored after having been read. +func DecodeCreateResponse(decoder func(*http.Response) goahttp.Decoder, restoreBody bool) func(*http.Response) (interface{}, error) { + return func(resp *http.Response) (interface{}, error) { + if restoreBody { + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + resp.Body = ioutil.NopCloser(bytes.NewBuffer(b)) + defer func() { + resp.Body = ioutil.NopCloser(bytes.NewBuffer(b)) + }() + } else { + defer resp.Body.Close() + } + switch resp.StatusCode { + case http.StatusOK: + var ( + body CreateResponseBody + err error + ) + err = decoder(resp).Decode(&body) + if err != nil { + return nil, goahttp.ErrDecodingError("taskList", "Create", err) + } + err = ValidateCreateResponseBody(&body) + if err != nil { + return nil, goahttp.ErrValidationError("taskList", "Create", err) + } + res := NewCreateTaskListResultOK(&body) + return res, nil + default: + body, _ := ioutil.ReadAll(resp.Body) + return nil, goahttp.ErrInvalidResponse("taskList", "Create", resp.StatusCode, string(body)) + } + } +} diff --git a/gen/http/task_list/client/paths.go b/gen/http/task_list/client/paths.go new file mode 100644 index 0000000000000000000000000000000000000000..ce764600818ffe4a5a0df13ee8cdd75e3db92428 --- /dev/null +++ b/gen/http/task_list/client/paths.go @@ -0,0 +1,17 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// HTTP request path constructors for the taskList service. +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package client + +import ( + "fmt" +) + +// CreateTaskListPath returns the URL path to the taskList service Create HTTP endpoint. +func CreateTaskListPath(taskListName string) string { + return fmt.Sprintf("/v1/taskList/%v", taskListName) +} diff --git a/gen/http/task_list/client/types.go b/gen/http/task_list/client/types.go new file mode 100644 index 0000000000000000000000000000000000000000..902273026c2e1b2237fc0b70252f96669b7e42c1 --- /dev/null +++ b/gen/http/task_list/client/types.go @@ -0,0 +1,38 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// taskList HTTP client types +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package client + +import ( + tasklist "code.vereign.com/gaiax/tsa/task/gen/task_list" + goa "goa.design/goa/v3/pkg" +) + +// CreateResponseBody is the type of the "taskList" service "Create" endpoint +// HTTP response body. +type CreateResponseBody struct { + // Unique taskList identifier. + TaskListID *string `form:"taskListID,omitempty" json:"taskListID,omitempty" xml:"taskListID,omitempty"` +} + +// NewCreateTaskListResultOK builds a "taskList" service "Create" endpoint +// result from a HTTP "OK" response. +func NewCreateTaskListResultOK(body *CreateResponseBody) *tasklist.CreateTaskListResult { + v := &tasklist.CreateTaskListResult{ + TaskListID: *body.TaskListID, + } + + return v +} + +// ValidateCreateResponseBody runs the validations defined on CreateResponseBody +func ValidateCreateResponseBody(body *CreateResponseBody) (err error) { + if body.TaskListID == nil { + err = goa.MergeErrors(err, goa.MissingFieldError("taskListID", "body")) + } + return +} diff --git a/gen/http/task_list/server/encode_decode.go b/gen/http/task_list/server/encode_decode.go new file mode 100644 index 0000000000000000000000000000000000000000..be27fbb71c3e3e8a5cfc1859b3aabb3352be4fab --- /dev/null +++ b/gen/http/task_list/server/encode_decode.go @@ -0,0 +1,68 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// taskList HTTP server encoders and decoders +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package server + +import ( + "context" + "io" + "net/http" + + tasklist "code.vereign.com/gaiax/tsa/task/gen/task_list" + goahttp "goa.design/goa/v3/http" + goa "goa.design/goa/v3/pkg" +) + +// EncodeCreateResponse returns an encoder for responses returned by the +// taskList Create endpoint. +func EncodeCreateResponse(encoder func(context.Context, http.ResponseWriter) goahttp.Encoder) func(context.Context, http.ResponseWriter, interface{}) error { + return func(ctx context.Context, w http.ResponseWriter, v interface{}) error { + res, _ := v.(*tasklist.CreateTaskListResult) + enc := encoder(ctx, w) + body := NewCreateResponseBody(res) + w.WriteHeader(http.StatusOK) + return enc.Encode(body) + } +} + +// DecodeCreateRequest returns a decoder for requests sent to the taskList +// Create endpoint. +func DecodeCreateRequest(mux goahttp.Muxer, decoder func(*http.Request) goahttp.Decoder) func(*http.Request) (interface{}, error) { + return func(r *http.Request) (interface{}, error) { + var ( + body interface{} + err error + ) + err = decoder(r).Decode(&body) + if err != nil { + if err == io.EOF { + return nil, goa.MissingPayloadError() + } + return nil, goa.DecodePayloadError(err.Error()) + } + + var ( + taskListName string + cacheNamespace *string + cacheScope *string + + params = mux.Vars(r) + ) + taskListName = params["taskListName"] + cacheNamespaceRaw := r.Header.Get("x-cache-namespace") + if cacheNamespaceRaw != "" { + cacheNamespace = &cacheNamespaceRaw + } + cacheScopeRaw := r.Header.Get("x-cache-scope") + if cacheScopeRaw != "" { + cacheScope = &cacheScopeRaw + } + payload := NewCreateTaskListRequest(body, taskListName, cacheNamespace, cacheScope) + + return payload, nil + } +} diff --git a/gen/http/task_list/server/paths.go b/gen/http/task_list/server/paths.go new file mode 100644 index 0000000000000000000000000000000000000000..80662ac5bb501534cf2f22138127b98eb86bcece --- /dev/null +++ b/gen/http/task_list/server/paths.go @@ -0,0 +1,17 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// HTTP request path constructors for the taskList service. +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package server + +import ( + "fmt" +) + +// CreateTaskListPath returns the URL path to the taskList service Create HTTP endpoint. +func CreateTaskListPath(taskListName string) string { + return fmt.Sprintf("/v1/taskList/%v", taskListName) +} diff --git a/gen/http/task_list/server/server.go b/gen/http/task_list/server/server.go new file mode 100644 index 0000000000000000000000000000000000000000..4d4d72dcec6b49130f5823ceb8148687eb4ccab0 --- /dev/null +++ b/gen/http/task_list/server/server.go @@ -0,0 +1,131 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// taskList HTTP server +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package server + +import ( + "context" + "net/http" + + tasklist "code.vereign.com/gaiax/tsa/task/gen/task_list" + goahttp "goa.design/goa/v3/http" + goa "goa.design/goa/v3/pkg" +) + +// Server lists the taskList service endpoint HTTP handlers. +type Server struct { + Mounts []*MountPoint + Create http.Handler +} + +// ErrorNamer is an interface implemented by generated error structs that +// exposes the name of the error as defined in the design. +type ErrorNamer interface { + ErrorName() string +} + +// MountPoint holds information about the mounted endpoints. +type MountPoint struct { + // Method is the name of the service method served by the mounted HTTP handler. + Method string + // Verb is the HTTP method used to match requests to the mounted handler. + Verb string + // Pattern is the HTTP request path pattern used to match requests to the + // mounted handler. + Pattern string +} + +// New instantiates HTTP handlers for all the taskList service endpoints using +// the provided encoder and decoder. The handlers are mounted on the given mux +// using the HTTP verb and path defined in the design. errhandler is called +// whenever a response fails to be encoded. formatter is used to format errors +// returned by the service methods prior to encoding. Both errhandler and +// formatter are optional and can be nil. +func New( + e *tasklist.Endpoints, + mux goahttp.Muxer, + decoder func(*http.Request) goahttp.Decoder, + encoder func(context.Context, http.ResponseWriter) goahttp.Encoder, + errhandler func(context.Context, http.ResponseWriter, error), + formatter func(err error) goahttp.Statuser, +) *Server { + return &Server{ + Mounts: []*MountPoint{ + {"Create", "POST", "/v1/taskList/{taskListName}"}, + }, + Create: NewCreateHandler(e.Create, mux, decoder, encoder, errhandler, formatter), + } +} + +// Service returns the name of the service served. +func (s *Server) Service() string { return "taskList" } + +// Use wraps the server handlers with the given middleware. +func (s *Server) Use(m func(http.Handler) http.Handler) { + s.Create = m(s.Create) +} + +// Mount configures the mux to serve the taskList endpoints. +func Mount(mux goahttp.Muxer, h *Server) { + MountCreateHandler(mux, h.Create) +} + +// Mount configures the mux to serve the taskList endpoints. +func (s *Server) Mount(mux goahttp.Muxer) { + Mount(mux, s) +} + +// MountCreateHandler configures the mux to serve the "taskList" service +// "Create" endpoint. +func MountCreateHandler(mux goahttp.Muxer, h http.Handler) { + f, ok := h.(http.HandlerFunc) + if !ok { + f = func(w http.ResponseWriter, r *http.Request) { + h.ServeHTTP(w, r) + } + } + mux.Handle("POST", "/v1/taskList/{taskListName}", f) +} + +// NewCreateHandler creates a HTTP handler which loads the HTTP request and +// calls the "taskList" service "Create" endpoint. +func NewCreateHandler( + endpoint goa.Endpoint, + mux goahttp.Muxer, + decoder func(*http.Request) goahttp.Decoder, + encoder func(context.Context, http.ResponseWriter) goahttp.Encoder, + errhandler func(context.Context, http.ResponseWriter, error), + formatter func(err error) goahttp.Statuser, +) http.Handler { + var ( + decodeRequest = DecodeCreateRequest(mux, decoder) + encodeResponse = EncodeCreateResponse(encoder) + encodeError = goahttp.ErrorEncoder(encoder, formatter) + ) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := context.WithValue(r.Context(), goahttp.AcceptTypeKey, r.Header.Get("Accept")) + ctx = context.WithValue(ctx, goa.MethodKey, "Create") + ctx = context.WithValue(ctx, goa.ServiceKey, "taskList") + payload, err := decodeRequest(r) + if err != nil { + if err := encodeError(ctx, w, err); err != nil { + errhandler(ctx, w, err) + } + return + } + res, err := endpoint(ctx, payload) + if err != nil { + if err := encodeError(ctx, w, err); err != nil { + errhandler(ctx, w, err) + } + return + } + if err := encodeResponse(ctx, w, res); err != nil { + errhandler(ctx, w, err) + } + }) +} diff --git a/gen/http/task_list/server/types.go b/gen/http/task_list/server/types.go new file mode 100644 index 0000000000000000000000000000000000000000..879b0a1fa38948d6d92357413004176b75adea8c --- /dev/null +++ b/gen/http/task_list/server/types.go @@ -0,0 +1,41 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// taskList HTTP server types +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package server + +import ( + tasklist "code.vereign.com/gaiax/tsa/task/gen/task_list" +) + +// CreateResponseBody is the type of the "taskList" service "Create" endpoint +// HTTP response body. +type CreateResponseBody struct { + // Unique taskList identifier. + TaskListID string `form:"taskListID" json:"taskListID" xml:"taskListID"` +} + +// NewCreateResponseBody builds the HTTP response body from the result of the +// "Create" endpoint of the "taskList" service. +func NewCreateResponseBody(res *tasklist.CreateTaskListResult) *CreateResponseBody { + body := &CreateResponseBody{ + TaskListID: res.TaskListID, + } + return body +} + +// NewCreateTaskListRequest builds a taskList service Create endpoint payload. +func NewCreateTaskListRequest(body interface{}, taskListName string, cacheNamespace *string, cacheScope *string) *tasklist.CreateTaskListRequest { + v := body + res := &tasklist.CreateTaskListRequest{ + Data: v, + } + res.TaskListName = taskListName + res.CacheNamespace = cacheNamespace + res.CacheScope = cacheScope + + return res +} diff --git a/gen/task/client.go b/gen/task/client.go index 3f812485b1c4b9ff706e6278e0bb385f7f4da636..55b4c3000e146d898a66155c3092df2732eee219 100644 --- a/gen/task/client.go +++ b/gen/task/client.go @@ -28,13 +28,13 @@ func NewClient(create, taskResult goa.Endpoint) *Client { } // Create calls the "Create" endpoint of the "task" service. -func (c *Client) Create(ctx context.Context, p *CreateRequest) (res *CreateResult, err error) { +func (c *Client) Create(ctx context.Context, p *CreateTaskRequest) (res *CreateTaskResult, err error) { var ires interface{} ires, err = c.CreateEndpoint(ctx, p) if err != nil { return } - return ires.(*CreateResult), nil + return ires.(*CreateTaskResult), nil } // TaskResult calls the "TaskResult" endpoint of the "task" service. diff --git a/gen/task/endpoints.go b/gen/task/endpoints.go index b648dd83cb34f626e54dbfba734439cbfbe813ca..5ea36902f9d0493c4337209bcb40a9899537111e 100644 --- a/gen/task/endpoints.go +++ b/gen/task/endpoints.go @@ -37,7 +37,7 @@ func (e *Endpoints) Use(m func(goa.Endpoint) goa.Endpoint) { // "Create" of service "task". func NewCreateEndpoint(s Service) goa.Endpoint { return func(ctx context.Context, req interface{}) (interface{}, error) { - p := req.(*CreateRequest) + p := req.(*CreateTaskRequest) return s.Create(ctx, p) } } diff --git a/gen/task/service.go b/gen/task/service.go index f5c4b6954680960745e92c7556d1f243413b7647..81ff7d1ff5b93eb7cb3b05e00d55b30cfbefa0f1 100644 --- a/gen/task/service.go +++ b/gen/task/service.go @@ -14,7 +14,7 @@ import ( // Task service provides endpoints to work with tasks. type Service interface { // Create a task and put it in a queue for execution. - Create(context.Context, *CreateRequest) (res *CreateResult, err error) + Create(context.Context, *CreateTaskRequest) (res *CreateTaskResult, err error) // TaskResult retrieves task result from the Cache service. TaskResult(context.Context, *TaskResultRequest) (res interface{}, err error) } @@ -29,8 +29,8 @@ const ServiceName = "task" // MethodKey key. var MethodNames = [2]string{"Create", "TaskResult"} -// CreateRequest is the payload type of the task service Create method. -type CreateRequest struct { +// CreateTaskRequest is the payload type of the task service Create method. +type CreateTaskRequest struct { // Task name. TaskName string // Data contains JSON payload that will be used for task execution. @@ -41,8 +41,8 @@ type CreateRequest struct { CacheScope *string } -// CreateResult is the result type of the task service Create method. -type CreateResult struct { +// CreateTaskResult is the result type of the task service Create method. +type CreateTaskResult struct { // Unique task identifier. TaskID string } diff --git a/gen/task_list/client.go b/gen/task_list/client.go new file mode 100644 index 0000000000000000000000000000000000000000..dcaf4ef0bdab346d90e25b0f4773756a72c10f42 --- /dev/null +++ b/gen/task_list/client.go @@ -0,0 +1,36 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// taskList client +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package tasklist + +import ( + "context" + + goa "goa.design/goa/v3/pkg" +) + +// Client is the "taskList" service client. +type Client struct { + CreateEndpoint goa.Endpoint +} + +// NewClient initializes a "taskList" service client given the endpoints. +func NewClient(create goa.Endpoint) *Client { + return &Client{ + CreateEndpoint: create, + } +} + +// Create calls the "Create" endpoint of the "taskList" service. +func (c *Client) Create(ctx context.Context, p *CreateTaskListRequest) (res *CreateTaskListResult, err error) { + var ires interface{} + ires, err = c.CreateEndpoint(ctx, p) + if err != nil { + return + } + return ires.(*CreateTaskListResult), nil +} diff --git a/gen/task_list/endpoints.go b/gen/task_list/endpoints.go new file mode 100644 index 0000000000000000000000000000000000000000..0a2bc0e4b32a0704d6cf429df56fd1c5823cf708 --- /dev/null +++ b/gen/task_list/endpoints.go @@ -0,0 +1,40 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// taskList endpoints +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package tasklist + +import ( + "context" + + goa "goa.design/goa/v3/pkg" +) + +// Endpoints wraps the "taskList" service endpoints. +type Endpoints struct { + Create goa.Endpoint +} + +// NewEndpoints wraps the methods of the "taskList" service with endpoints. +func NewEndpoints(s Service) *Endpoints { + return &Endpoints{ + Create: NewCreateEndpoint(s), + } +} + +// Use applies the given middleware to all the "taskList" service endpoints. +func (e *Endpoints) Use(m func(goa.Endpoint) goa.Endpoint) { + e.Create = m(e.Create) +} + +// NewCreateEndpoint returns an endpoint function that calls the method +// "Create" of service "taskList". +func NewCreateEndpoint(s Service) goa.Endpoint { + return func(ctx context.Context, req interface{}) (interface{}, error) { + p := req.(*CreateTaskListRequest) + return s.Create(ctx, p) + } +} diff --git a/gen/task_list/service.go b/gen/task_list/service.go new file mode 100644 index 0000000000000000000000000000000000000000..e7dbf3ae62c41719450b9ba97002b00c184f3fce --- /dev/null +++ b/gen/task_list/service.go @@ -0,0 +1,49 @@ +// Code generated by goa v3.7.0, DO NOT EDIT. +// +// taskList service +// +// Command: +// $ goa gen code.vereign.com/gaiax/tsa/task/design + +package tasklist + +import ( + "context" +) + +// TaskList service provides endpoints to work with task lists. +type Service interface { + // Create a task list, corresponding groups and tasks and put them in + // respective queues for execution. + Create(context.Context, *CreateTaskListRequest) (res *CreateTaskListResult, err error) +} + +// ServiceName is the name of the service as defined in the design. This is the +// same value that is set in the endpoint request contexts under the ServiceKey +// key. +const ServiceName = "taskList" + +// MethodNames lists the service method names as defined in the design. These +// are the same values that are set in the endpoint request contexts under the +// MethodKey key. +var MethodNames = [1]string{"Create"} + +// CreateTaskListRequest is the payload type of the taskList service Create +// method. +type CreateTaskListRequest struct { + // TaskList name. + TaskListName string + // Data contains JSON payload that will be used for taskList execution. + Data interface{} + // Cache key namespace. + CacheNamespace *string + // Cache key scope. + CacheScope *string +} + +// CreateTaskListResult is the result type of the taskList service Create +// method. +type CreateTaskListResult struct { + // Unique taskList identifier. + TaskListID string +} diff --git a/internal/config/config.go b/internal/config/config.go index f68e90ba1d3c58f86e468b9b72b4e60e791d6d91..42b6cce22080f379c5bdc7095eee55bb894df94c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,11 +3,12 @@ package config import "time" type Config struct { - HTTP httpConfig - Mongo mongoConfig - Policy policyConfig - Executor executorConfig - Cache cacheConfig + HTTP httpConfig + Mongo mongoConfig + Policy policyConfig + Executor executorConfig + ListExecutor listExecutorConfig + Cache cacheConfig LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` } @@ -35,6 +36,11 @@ type executorConfig struct { PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` } +type listExecutorConfig struct { + Workers int `envconfig:"LIST_EXECUTOR_WORKERS" default:"5"` + PollInterval time.Duration `envconfig:"LIST_EXECUTOR_POLL_INTERVAL" default:"1s"` +} + type cacheConfig struct { Addr string `envconfig:"CACHE_ADDR" required:"true"` } diff --git a/internal/listexecutor/listexecutor.go b/internal/listexecutor/listexecutor.go new file mode 100644 index 0000000000000000000000000000000000000000..530e0eee6c67926ef6f7dab60241daad6c861b04 --- /dev/null +++ b/internal/listexecutor/listexecutor.go @@ -0,0 +1,297 @@ +package listexecutor + +import ( + "bytes" + "context" + "io" + "net/http" + "time" + + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + taskpkg "code.vereign.com/gaiax/tsa/task/internal/service/task" + "code.vereign.com/gaiax/tsa/task/internal/service/tasklist" +) + +type token struct{} + +const ( + sequential = "sequential" + parallel = "parallel" +) + +// Policy client. +type Policy interface { + Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) +} + +// Queue allows retrieving, returning and deleting taskLists and group tasks from storage. +type Queue interface { + PollList(ctx context.Context) (*tasklist.TaskList, error) + AckList(ctx context.Context, taskList *tasklist.TaskList) error + AckGroupTasks(ctx context.Context, group *tasklist.Group) error +} + +type Storage interface { + GetGroupTasks(ctx context.Context, group *tasklist.Group) ([]*taskpkg.Task, error) + SaveTaskHistory(ctx context.Context, task *taskpkg.Task) error + SaveTaskListHistory(ctx context.Context, task *tasklist.TaskList) error +} + +type Cache interface { + Set(ctx context.Context, key, namespace, scope string, value []byte) error + Get(ctx context.Context, key, namespace, scope string) ([]byte, error) +} + +type ListExecutor struct { + queue Queue + policy Policy + storage Storage + cache Cache + workers int + pollInterval time.Duration + + httpClient *http.Client + logger *zap.Logger +} + +func New( + queue Queue, + policy Policy, + storage Storage, + cache Cache, + workers int, + pollInterval time.Duration, + httpClient *http.Client, + logger *zap.Logger, +) *ListExecutor { + return &ListExecutor{ + queue: queue, + policy: policy, + storage: storage, + cache: cache, + workers: workers, + pollInterval: pollInterval, + httpClient: httpClient, + logger: logger, + } +} + +func (l *ListExecutor) Start(ctx context.Context) error { + defer l.logger.Info("taskList executor stopped") + + // buffered channel used as a semaphore to limit concurrent executions + sem := make(chan token, l.workers) + +loop: + for { + select { + case <-ctx.Done(): + break loop + case <-time.After(l.pollInterval): + sem <- token{} // acquire a semaphore + + taskList, err := l.queue.PollList(ctx) + if err != nil { + if !errors.Is(errors.NotFound, err) { + l.logger.Error("error getting taskList from queue", zap.Error(err)) + } + <-sem // release the semaphore + continue + } + + go func(list *tasklist.TaskList) { + l.Execute(ctx, list) + <-sem // release the semaphore + }(taskList) + } + } + + // wait for completion + for n := l.workers; n > 0; n-- { + sem <- token{} + } + + return ctx.Err() +} + +func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) { + logger := l.logger.With( + zap.String("taskListID", list.ID), + zap.String("taskListName", list.Name), + ) + list.State = taskpkg.Pending + list.StartedAt = time.Now() + + // execute groups sequentially + for i := range list.Groups { + if err := l.executeGroup(ctx, &list.Groups[i]); err != nil { + logger.Error("error executing group", zap.Error(err)) + list.Groups[i].State = taskpkg.Failed + list.State = taskpkg.Failed + } + } + + if list.State != taskpkg.Failed { + list.State = taskpkg.Done + } + list.FinishedAt = time.Now() + + if err := l.storage.SaveTaskListHistory(ctx, list); err != nil { + logger.Error("error saving taskList history", zap.Error(err)) + } else { + logger.Debug("taskList history is saved") + } + + if err := l.queue.AckList(ctx, list); err != nil { + logger.Error("failed to ack taskList in queue", zap.Error(err)) + } +} + +func (l *ListExecutor) executeGroup(ctx context.Context, group *tasklist.Group) error { + switch exec := group.Execution; exec { + case sequential: + return l.executeSequential(ctx, group) + case parallel: + return errors.New("not implemented") + } + + return errors.New("unknown type of group execution") +} + +func (l *ListExecutor) executeSequential(ctx context.Context, group *tasklist.Group) error { + group.State = taskpkg.Pending + + tasks, err := l.storage.GetGroupTasks(ctx, group) + if err != nil { + return err + } + + req := group.Request + for _, task := range tasks { + logger := l.logger.With( + zap.String("taskID", task.ID), + zap.String("taskName", task.Name), + ) + + // mark all subsequent tasks as failed if one task already failed + if group.State == taskpkg.Failed { + task.State = taskpkg.Failed + continue + } + + task.Request = req + err := l.executeTask(ctx, task) + if err != nil { + task.State = taskpkg.Failed + group.State = taskpkg.Failed + logger.Error("error executing task", zap.Error(err)) + continue + } + logger.Debug("task execution completed successfully") + + // pass the response from current task as an input to the next task + req = task.Response + + if err := l.cache.Set( + ctx, + task.ID, + task.CacheNamespace, + task.CacheScope, + task.Response, + ); err != nil { + logger.Error("error storing task result in cache", zap.Error(err)) + continue + } + logger.Debug("task results are stored in cache") + + if err := l.storage.SaveTaskHistory(ctx, task); err != nil { + logger.Error("error saving task history", zap.Error(err)) + continue + } + logger.Debug("task history is saved") + } + + // remove tasks from queue + if err := l.queue.AckGroupTasks(ctx, group); err != nil { + l.logger.With(zap.String("groupID", group.ID)).Error("failed to ack group tasks in queue", zap.Error(err)) + } + + if group.State != taskpkg.Failed { + group.State = taskpkg.Done + } + + return nil +} + +func (l *ListExecutor) executeTask(ctx context.Context, task *taskpkg.Task) error { + task.StartedAt = time.Now() + + var response []byte + var err error + + // task is executing a request policy OR an HTTP call to predefined URL + if task.RequestPolicy != "" { + response, err = l.policy.Evaluate(ctx, task.RequestPolicy, task.Request) + if err != nil { + return errors.New("error evaluating request policy", err) + } + task.ResponseCode = http.StatusOK + } else if task.URL != "" && task.Method != "" { + var status int + status, response, err = l.doHTTPTask(ctx, task) + if err != nil { + return err + } + task.ResponseCode = status + } else { + return errors.New(errors.Internal, "invalid task: must define either request policy or url") + } + + task.Response = response + + // evaluate response policy + if task.ResponsePolicy != "" { + resp, err := l.policy.Evaluate(ctx, task.ResponsePolicy, task.Response) + if err != nil { + return errors.New("error evaluating response policy", err) + } + // overwrite task response with the one returned from the policy + task.Response = resp + } + + // evaluate finalizer policy + if task.FinalPolicy != "" { + resp, err := l.policy.Evaluate(ctx, task.FinalPolicy, task.Response) + if err != nil { + return errors.New("error evaluating final policy", err) + } + // overwrite task response with the one returned from the policy + task.Response = resp + } + + task.State = taskpkg.Done + task.FinishedAt = time.Now() + return nil +} + +func (l *ListExecutor) doHTTPTask(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) { + req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request)) + if err != nil { + return 0, nil, errors.New("error creating http request", err) + } + + resp, err := l.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return 0, nil, errors.New("error executing http request", err) + } + defer resp.Body.Close() // nolint:errcheck + + response, err = io.ReadAll(resp.Body) + if err != nil { + return 0, nil, errors.New("error reading response body", err) + } + + return resp.StatusCode, response, nil +} diff --git a/internal/service/task/service.go b/internal/service/task/service.go index a4ea25ad30982f806f5b9f1431107321c9601b9d..a7dde0cf2c011b9bd52c2bbc4c5996fbdbaa3f56 100644 --- a/internal/service/task/service.go +++ b/internal/service/task/service.go @@ -54,7 +54,7 @@ func New(template Storage, queue Queue, cache Cache, logger *zap.Logger) *Servic } // Create a new task and put it in a Queue for later execution. -func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *goatask.CreateResult, err error) { +func (s *Service) Create(ctx context.Context, req *goatask.CreateTaskRequest) (res *goatask.CreateTaskResult, err error) { if req.TaskName == "" { return nil, errors.New(errors.BadRequest, "missing taskName") } @@ -92,7 +92,7 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res * return nil, errors.New("failed to create task", err) } - return &goatask.CreateResult{TaskID: task.ID}, nil + return &goatask.CreateTaskResult{TaskID: task.ID}, nil } // TaskResult retrieves task result from the Cache service. @@ -121,7 +121,7 @@ func (s *Service) TaskResult(ctx context.Context, req *goatask.TaskResultRequest } } - if task.State != Done { + if task.State != Done && task.State != Failed { return nil, errors.New(errors.NotFound, "no result, task is not completed") } diff --git a/internal/service/task/service_test.go b/internal/service/task/service_test.go index 70051423452d36b66e184e8d3ea6f28fb504db3b..eef64de7aaf67300f1b19782421cdb3771f14b2b 100644 --- a/internal/service/task/service_test.go +++ b/internal/service/task/service_test.go @@ -22,7 +22,7 @@ func TestNew(t *testing.T) { func TestService_Create(t *testing.T) { tests := []struct { name string - req *goatask.CreateRequest + req *goatask.CreateTaskRequest storage *taskfakes.FakeStorage queue *taskfakes.FakeQueue cache *taskfakes.FakeCache @@ -32,13 +32,13 @@ func TestService_Create(t *testing.T) { }{ { name: "empty task name", - req: &goatask.CreateRequest{}, + req: &goatask.CreateTaskRequest{}, errkind: errors.BadRequest, errtext: "missing taskName", }, { name: "task template not found", - req: &goatask.CreateRequest{TaskName: "taskname"}, + req: &goatask.CreateTaskRequest{TaskName: "taskname"}, storage: &taskfakes.FakeStorage{ TaskTemplateStub: func(ctx context.Context, taskName string) (*task.Task, error) { return nil, errors.New(errors.NotFound) @@ -49,7 +49,7 @@ func TestService_Create(t *testing.T) { }, { name: "fail to add task to queue", - req: &goatask.CreateRequest{TaskName: "taskname"}, + req: &goatask.CreateTaskRequest{TaskName: "taskname"}, storage: &taskfakes.FakeStorage{ TaskTemplateStub: func(ctx context.Context, taskName string) (*task.Task, error) { return &task.Task{}, nil @@ -65,7 +65,7 @@ func TestService_Create(t *testing.T) { }, { name: "successfully add task to queue", - req: &goatask.CreateRequest{TaskName: "taskname"}, + req: &goatask.CreateTaskRequest{TaskName: "taskname"}, storage: &taskfakes.FakeStorage{ TaskTemplateStub: func(ctx context.Context, taskName string) (*task.Task, error) { return &task.Task{}, nil @@ -79,7 +79,7 @@ func TestService_Create(t *testing.T) { }, { name: "successfully add task to queue with namespace and scope", - req: &goatask.CreateRequest{ + req: &goatask.CreateTaskRequest{ TaskName: "taskname", CacheNamespace: ptr.String("login"), CacheScope: ptr.String("user"), diff --git a/internal/service/task/task.go b/internal/service/task/task.go index dd53a5175c505258f3822fa0ee6e8cdd57533dd7..f1d4db423724579cb1a3830488d2c0984e25c11d 100644 --- a/internal/service/task/task.go +++ b/internal/service/task/task.go @@ -18,11 +18,15 @@ const ( // Done state is when the task is completed. // TODO(penkovski): do we need this state if task is deleted after it is done? Done = "done" + + // Failed state is when the task execution failed but the task is part + // of a group and later execution is not possible, so it could not be "Unack"-ed + Failed = "failed" ) type Task struct { ID string `json:"id"` // ID is unique task identifier. - GroupID string `json:"groupID"` // GroupID is set when the task is part of `task.Group`. + GroupID string `json:"groupID"` // GroupID is set when the task is part of `tasklist.Group`. Name string `json:"name"` // Name is used by external callers use to create tasks. State State `json:"state"` // State of the task. URL string `json:"url"` // URL against which the task request will be executed (optional). @@ -40,17 +44,6 @@ type Task struct { FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done. } -type Group struct { - ID string - Tasks []Task - State State - Metadata interface{} // TODO(penkovski): not yet clear - FinalPolicy string - CreateTime time.Time - StartTime time.Time - FinishTime time.Time -} - // CacheKey constructs the key for storing task result in the cache. func (t *Task) CacheKey() string { key := t.ID diff --git a/internal/service/tasklist/service.go b/internal/service/tasklist/service.go new file mode 100644 index 0000000000000000000000000000000000000000..5542a7d24eace9e0d4782eea171151a4a46b4603 --- /dev/null +++ b/internal/service/tasklist/service.go @@ -0,0 +1,175 @@ +package tasklist + +import ( + "context" + "encoding/json" + "time" + + "github.com/google/uuid" + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + goatasklist "code.vereign.com/gaiax/tsa/task/gen/task_list" + "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +//go:generate counterfeiter . Storage +//go:generate counterfeiter . Queue + +// Storage for retrieving predefined task templates. +type Storage interface { + TaskListTemplate(ctx context.Context, taskListName string) (*Template, error) + TaskTemplates(ctx context.Context, names []string) (map[string]*task.Task, error) +} + +type Queue interface { + AddTaskList(ctx context.Context, taskList *TaskList, tasks []*task.Task) error +} + +type Service struct { + storage Storage + queue Queue + + logger *zap.Logger +} + +func New(template Storage, queue Queue, logger *zap.Logger) *Service { + return &Service{ + storage: template, + queue: queue, + logger: logger, + } +} + +// Create a taskList and corresponding tasks and put them in +// respective queues for execution. +func (s *Service) Create(ctx context.Context, req *goatasklist.CreateTaskListRequest) (*goatasklist.CreateTaskListResult, error) { + if req.TaskListName == "" { + return nil, errors.New(errors.BadRequest, "missing taskListName") + } + + logger := s.logger.With(zap.String("taskListName", req.TaskListName)) + + // get predefined taskList definition from storage + template, err := s.storage.TaskListTemplate(ctx, req.TaskListName) + if err != nil { + logger.Error("error getting taskList template from storage", zap.Error(err)) + return nil, err + } + + // get predefined task definitions from storage + taskTemplates, err := s.storage.TaskTemplates(ctx, taskNamesFromTaskListTemplate(template)) + if err != nil { + logger.Error("error getting task templates from storage") + return nil, err + } + + taskListRequest, err := json.Marshal(req.Data) + if err != nil { + logger.Error("error marshaling request data to JSON", zap.Error(err)) + return nil, errors.New(errors.BadRequest, "error marshaling request data to JSON", err) + } + + taskList := &TaskList{ + ID: uuid.NewString(), + Groups: createGroups(template, taskListRequest), + Name: template.Name, + Request: taskListRequest, + CacheScope: template.CacheScope, + CacheNamespace: template.CacheNamespace, + State: task.Created, + CreatedAt: time.Now(), + } + + // if cache namespace and scope are given, use them instead of the defaults + if req.CacheNamespace != nil && *req.CacheNamespace != "" { + taskList.CacheNamespace = *req.CacheNamespace + } + if req.CacheScope != nil && *req.CacheScope != "" { + taskList.CacheScope = *req.CacheScope + } + + tasks, err := createTasks(taskList, taskTemplates) + if err != nil { + logger.Error("failed to create tasks for taskList", zap.Error(err)) + return nil, errors.New("failed to create tasks for taskList", err) + } + + if err := s.queue.AddTaskList(ctx, taskList, tasks); err != nil { + logger.Error("error adding taskList to queue", zap.Error(err)) + return nil, errors.New("error adding taskList to queue", err) + } + + return &goatasklist.CreateTaskListResult{ + TaskListID: taskList.ID, + }, nil +} + +func createGroups(t *Template, req []byte) []Group { + var groups []Group + for _, group := range t.Groups { + g := Group{ + ID: uuid.NewString(), + Execution: group.Execution, + Tasks: group.Tasks, + State: task.Created, + Request: req, + FinalPolicy: group.FinalPolicy, + } + groups = append(groups, g) + } + + return groups +} + +// createTasks creates task.Task instances out of task templates +// in order to be added to queue for execution +func createTasks(t *TaskList, templates map[string]*task.Task) ([]*task.Task, error) { + var tasks []*task.Task + for _, group := range t.Groups { + for _, taskName := range group.Tasks { + template, ok := templates[taskName] + if !ok { + return nil, errors.New(errors.NotFound, "failed to find task template") + } + + task := task.Task{ + ID: uuid.NewString(), + GroupID: group.ID, + Name: taskName, + State: task.Created, + URL: template.URL, + Method: template.Method, + RequestPolicy: template.RequestPolicy, + ResponsePolicy: template.ResponsePolicy, + FinalPolicy: template.FinalPolicy, + CacheNamespace: template.CacheNamespace, + CacheScope: template.CacheScope, + CreatedAt: time.Now(), + } + + // if cache namespace and scope are set in the taskList, use them instead of the defaults + if t.CacheNamespace != "" { + task.CacheNamespace = t.CacheNamespace + } + if t.CacheScope != "" { + task.CacheScope = t.CacheScope + } + + tasks = append(tasks, &task) + } + } + + return tasks, nil +} + +// taskNamesFromTaskListTemplate returns the names of all tasks within +// one taskList template +func taskNamesFromTaskListTemplate(template *Template) []string { + var names []string + for _, group := range template.Groups { + names = append(names, group.Tasks...) + } + + return names +} diff --git a/internal/service/tasklist/service_test.go b/internal/service/tasklist/service_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9dd3b39e39b25076f55f206661264627f2917c07 --- /dev/null +++ b/internal/service/tasklist/service_test.go @@ -0,0 +1,140 @@ +package tasklist_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + goatasklist "code.vereign.com/gaiax/tsa/task/gen/task_list" + "code.vereign.com/gaiax/tsa/task/internal/service/task" + "code.vereign.com/gaiax/tsa/task/internal/service/tasklist" + "code.vereign.com/gaiax/tsa/task/internal/service/tasklist/tasklistfakes" +) + +func TestNew(t *testing.T) { + svc := tasklist.New(nil, nil, zap.NewNop()) + assert.Implements(t, (*goatasklist.Service)(nil), svc) +} + +func Test_Create(t *testing.T) { + tests := []struct { + name string + req *goatasklist.CreateTaskListRequest + storage *tasklistfakes.FakeStorage + queue *tasklistfakes.FakeQueue + + errkind errors.Kind + errtext string + }{ + { + name: "empty taskList name", + req: &goatasklist.CreateTaskListRequest{}, + errkind: errors.BadRequest, + errtext: "missing taskListName", + }, + { + name: "taskList template not found", + req: &goatasklist.CreateTaskListRequest{TaskListName: "taskList name"}, + storage: &tasklistfakes.FakeStorage{ + TaskListTemplateStub: func(ctx context.Context, s string) (*tasklist.Template, error) { + return nil, errors.New(errors.NotFound) + }, + }, + errkind: errors.NotFound, + errtext: "not found", + }, + { + name: "error getting task templates form storage", + req: &goatasklist.CreateTaskListRequest{TaskListName: "taskList name"}, + storage: &tasklistfakes.FakeStorage{ + TaskListTemplateStub: func(ctx context.Context, s string) (*tasklist.Template, error) { + return &tasklist.Template{}, nil + }, + TaskTemplatesStub: func(ctx context.Context, strings []string) (map[string]*task.Task, error) { + return nil, errors.New(errors.Internal, "internal error") + }, + }, + errkind: errors.Internal, + errtext: "internal error", + }, + { + name: "error creating tasks for a taskList, task template not found", + req: &goatasklist.CreateTaskListRequest{TaskListName: "taskList name"}, + storage: &tasklistfakes.FakeStorage{ + TaskListTemplateStub: func(ctx context.Context, s string) (*tasklist.Template, error) { + return &tasklist.Template{ + Groups: []tasklist.GroupTemplate{ + { + Tasks: []string{"non-existent task template"}, + }, + }, + }, nil + }, + TaskTemplatesStub: func(ctx context.Context, strings []string) (map[string]*task.Task, error) { + return map[string]*task.Task{"template": &task.Task{}}, nil + }, + }, + errkind: errors.NotFound, + errtext: "failed to find task template", + }, + { + name: "failed to add taskList and tasks to queue", + req: &goatasklist.CreateTaskListRequest{TaskListName: "taskList name"}, + storage: &tasklistfakes.FakeStorage{ + TaskListTemplateStub: func(ctx context.Context, s string) (*tasklist.Template, error) { + return &tasklist.Template{}, nil + }, + TaskTemplatesStub: func(ctx context.Context, strings []string) (map[string]*task.Task, error) { + return map[string]*task.Task{"template": &task.Task{}}, nil + }, + }, + queue: &tasklistfakes.FakeQueue{ + AddTaskListStub: func(ctx context.Context, list *tasklist.TaskList, tasks []*task.Task) error { + return errors.New("storage error") + }, + }, + errkind: errors.Unknown, + errtext: "storage error", + }, + { + name: "successfully add taskList and tasks to queue", + req: &goatasklist.CreateTaskListRequest{TaskListName: "taskList name"}, + storage: &tasklistfakes.FakeStorage{ + TaskListTemplateStub: func(ctx context.Context, s string) (*tasklist.Template, error) { + return &tasklist.Template{}, nil + }, + TaskTemplatesStub: func(ctx context.Context, strings []string) (map[string]*task.Task, error) { + return map[string]*task.Task{"template": &task.Task{}}, nil + }, + }, + queue: &tasklistfakes.FakeQueue{ + AddTaskListStub: func(ctx context.Context, list *tasklist.TaskList, tasks []*task.Task) error { + return nil + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + svc := tasklist.New(test.storage, test.queue, zap.NewNop()) + res, err := svc.Create(context.Background(), test.req) + if err != nil { + assert.NotEmpty(t, test.errtext) + e, ok := err.(*errors.Error) + assert.True(t, ok) + assert.Equal(t, test.errkind, e.Kind) + assert.Contains(t, e.Error(), test.errtext) + assert.Nil(t, res) + } else { + assert.Empty(t, test.errtext) + assert.NotNil(t, res) + assert.NotEmpty(t, res.TaskListID) + } + + }) + } +} diff --git a/internal/service/tasklist/task_list.go b/internal/service/tasklist/task_list.go new file mode 100644 index 0000000000000000000000000000000000000000..0c9827e01d4555f09366b1b2808e51c4aa7c7d60 --- /dev/null +++ b/internal/service/tasklist/task_list.go @@ -0,0 +1,42 @@ +package tasklist + +import ( + "time" + + "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +type Template struct { + Name string `json:"name"` + CacheNamespace string `json:"cacheNamespace"` + CacheScope string `json:"cacheScope"` + Groups []GroupTemplate `json:"groups"` +} + +type GroupTemplate struct { + Execution string `json:"execution"` + FinalPolicy string `json:"finalPolicy"` + Tasks []string `json:"tasks"` +} + +type TaskList struct { + ID string `json:"id"` + Name string `json:"name"` + State task.State `json:"state"` + Groups []Group `json:"groups"` + Request []byte `json:"request"` + CacheNamespace string `json:"cacheNamespace"` + CacheScope string `json:"cacheScope"` + CreatedAt time.Time `json:"createdAt"` + StartedAt time.Time `json:"startedAt"` + FinishedAt time.Time `json:"finishedAt"` +} + +type Group struct { + ID string `json:"id"` + Execution string `json:"execution"` + Tasks []string `json:"tasks"` + State task.State `json:"state"` + Request []byte `json:"request"` + FinalPolicy string `json:"finalPolicy"` +} diff --git a/internal/service/tasklist/tasklistfakes/fake_queue.go b/internal/service/tasklist/tasklistfakes/fake_queue.go new file mode 100644 index 0000000000000000000000000000000000000000..75e572da1f1ea976808a9028be753d94b936f132 --- /dev/null +++ b/internal/service/tasklist/tasklistfakes/fake_queue.go @@ -0,0 +1,122 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package tasklistfakes + +import ( + "context" + "sync" + + "code.vereign.com/gaiax/tsa/task/internal/service/task" + "code.vereign.com/gaiax/tsa/task/internal/service/tasklist" +) + +type FakeQueue struct { + AddTaskListStub func(context.Context, *tasklist.TaskList, []*task.Task) error + addTaskListMutex sync.RWMutex + addTaskListArgsForCall []struct { + arg1 context.Context + arg2 *tasklist.TaskList + arg3 []*task.Task + } + addTaskListReturns struct { + result1 error + } + addTaskListReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeQueue) AddTaskList(arg1 context.Context, arg2 *tasklist.TaskList, arg3 []*task.Task) error { + var arg3Copy []*task.Task + if arg3 != nil { + arg3Copy = make([]*task.Task, len(arg3)) + copy(arg3Copy, arg3) + } + fake.addTaskListMutex.Lock() + ret, specificReturn := fake.addTaskListReturnsOnCall[len(fake.addTaskListArgsForCall)] + fake.addTaskListArgsForCall = append(fake.addTaskListArgsForCall, struct { + arg1 context.Context + arg2 *tasklist.TaskList + arg3 []*task.Task + }{arg1, arg2, arg3Copy}) + stub := fake.AddTaskListStub + fakeReturns := fake.addTaskListReturns + fake.recordInvocation("AddTaskList", []interface{}{arg1, arg2, arg3Copy}) + fake.addTaskListMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeQueue) AddTaskListCallCount() int { + fake.addTaskListMutex.RLock() + defer fake.addTaskListMutex.RUnlock() + return len(fake.addTaskListArgsForCall) +} + +func (fake *FakeQueue) AddTaskListCalls(stub func(context.Context, *tasklist.TaskList, []*task.Task) error) { + fake.addTaskListMutex.Lock() + defer fake.addTaskListMutex.Unlock() + fake.AddTaskListStub = stub +} + +func (fake *FakeQueue) AddTaskListArgsForCall(i int) (context.Context, *tasklist.TaskList, []*task.Task) { + fake.addTaskListMutex.RLock() + defer fake.addTaskListMutex.RUnlock() + argsForCall := fake.addTaskListArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeQueue) AddTaskListReturns(result1 error) { + fake.addTaskListMutex.Lock() + defer fake.addTaskListMutex.Unlock() + fake.AddTaskListStub = nil + fake.addTaskListReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeQueue) AddTaskListReturnsOnCall(i int, result1 error) { + fake.addTaskListMutex.Lock() + defer fake.addTaskListMutex.Unlock() + fake.AddTaskListStub = nil + if fake.addTaskListReturnsOnCall == nil { + fake.addTaskListReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.addTaskListReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeQueue) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.addTaskListMutex.RLock() + defer fake.addTaskListMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeQueue) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ tasklist.Queue = new(FakeQueue) diff --git a/internal/service/tasklist/tasklistfakes/fake_storage.go b/internal/service/tasklist/tasklistfakes/fake_storage.go new file mode 100644 index 0000000000000000000000000000000000000000..9ece29b9180862d8281bd5759b4104ff9a731e67 --- /dev/null +++ b/internal/service/tasklist/tasklistfakes/fake_storage.go @@ -0,0 +1,206 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package tasklistfakes + +import ( + "context" + "sync" + + "code.vereign.com/gaiax/tsa/task/internal/service/task" + "code.vereign.com/gaiax/tsa/task/internal/service/tasklist" +) + +type FakeStorage struct { + TaskListTemplateStub func(context.Context, string) (*tasklist.Template, error) + taskListTemplateMutex sync.RWMutex + taskListTemplateArgsForCall []struct { + arg1 context.Context + arg2 string + } + taskListTemplateReturns struct { + result1 *tasklist.Template + result2 error + } + taskListTemplateReturnsOnCall map[int]struct { + result1 *tasklist.Template + result2 error + } + TaskTemplatesStub func(context.Context, []string) (map[string]*task.Task, error) + taskTemplatesMutex sync.RWMutex + taskTemplatesArgsForCall []struct { + arg1 context.Context + arg2 []string + } + taskTemplatesReturns struct { + result1 map[string]*task.Task + result2 error + } + taskTemplatesReturnsOnCall map[int]struct { + result1 map[string]*task.Task + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeStorage) TaskListTemplate(arg1 context.Context, arg2 string) (*tasklist.Template, error) { + fake.taskListTemplateMutex.Lock() + ret, specificReturn := fake.taskListTemplateReturnsOnCall[len(fake.taskListTemplateArgsForCall)] + fake.taskListTemplateArgsForCall = append(fake.taskListTemplateArgsForCall, struct { + arg1 context.Context + arg2 string + }{arg1, arg2}) + stub := fake.TaskListTemplateStub + fakeReturns := fake.taskListTemplateReturns + fake.recordInvocation("TaskListTemplate", []interface{}{arg1, arg2}) + fake.taskListTemplateMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeStorage) TaskListTemplateCallCount() int { + fake.taskListTemplateMutex.RLock() + defer fake.taskListTemplateMutex.RUnlock() + return len(fake.taskListTemplateArgsForCall) +} + +func (fake *FakeStorage) TaskListTemplateCalls(stub func(context.Context, string) (*tasklist.Template, error)) { + fake.taskListTemplateMutex.Lock() + defer fake.taskListTemplateMutex.Unlock() + fake.TaskListTemplateStub = stub +} + +func (fake *FakeStorage) TaskListTemplateArgsForCall(i int) (context.Context, string) { + fake.taskListTemplateMutex.RLock() + defer fake.taskListTemplateMutex.RUnlock() + argsForCall := fake.taskListTemplateArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeStorage) TaskListTemplateReturns(result1 *tasklist.Template, result2 error) { + fake.taskListTemplateMutex.Lock() + defer fake.taskListTemplateMutex.Unlock() + fake.TaskListTemplateStub = nil + fake.taskListTemplateReturns = struct { + result1 *tasklist.Template + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) TaskListTemplateReturnsOnCall(i int, result1 *tasklist.Template, result2 error) { + fake.taskListTemplateMutex.Lock() + defer fake.taskListTemplateMutex.Unlock() + fake.TaskListTemplateStub = nil + if fake.taskListTemplateReturnsOnCall == nil { + fake.taskListTemplateReturnsOnCall = make(map[int]struct { + result1 *tasklist.Template + result2 error + }) + } + fake.taskListTemplateReturnsOnCall[i] = struct { + result1 *tasklist.Template + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) TaskTemplates(arg1 context.Context, arg2 []string) (map[string]*task.Task, error) { + var arg2Copy []string + if arg2 != nil { + arg2Copy = make([]string, len(arg2)) + copy(arg2Copy, arg2) + } + fake.taskTemplatesMutex.Lock() + ret, specificReturn := fake.taskTemplatesReturnsOnCall[len(fake.taskTemplatesArgsForCall)] + fake.taskTemplatesArgsForCall = append(fake.taskTemplatesArgsForCall, struct { + arg1 context.Context + arg2 []string + }{arg1, arg2Copy}) + stub := fake.TaskTemplatesStub + fakeReturns := fake.taskTemplatesReturns + fake.recordInvocation("TaskTemplates", []interface{}{arg1, arg2Copy}) + fake.taskTemplatesMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeStorage) TaskTemplatesCallCount() int { + fake.taskTemplatesMutex.RLock() + defer fake.taskTemplatesMutex.RUnlock() + return len(fake.taskTemplatesArgsForCall) +} + +func (fake *FakeStorage) TaskTemplatesCalls(stub func(context.Context, []string) (map[string]*task.Task, error)) { + fake.taskTemplatesMutex.Lock() + defer fake.taskTemplatesMutex.Unlock() + fake.TaskTemplatesStub = stub +} + +func (fake *FakeStorage) TaskTemplatesArgsForCall(i int) (context.Context, []string) { + fake.taskTemplatesMutex.RLock() + defer fake.taskTemplatesMutex.RUnlock() + argsForCall := fake.taskTemplatesArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeStorage) TaskTemplatesReturns(result1 map[string]*task.Task, result2 error) { + fake.taskTemplatesMutex.Lock() + defer fake.taskTemplatesMutex.Unlock() + fake.TaskTemplatesStub = nil + fake.taskTemplatesReturns = struct { + result1 map[string]*task.Task + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) TaskTemplatesReturnsOnCall(i int, result1 map[string]*task.Task, result2 error) { + fake.taskTemplatesMutex.Lock() + defer fake.taskTemplatesMutex.Unlock() + fake.TaskTemplatesStub = nil + if fake.taskTemplatesReturnsOnCall == nil { + fake.taskTemplatesReturnsOnCall = make(map[int]struct { + result1 map[string]*task.Task + result2 error + }) + } + fake.taskTemplatesReturnsOnCall[i] = struct { + result1 map[string]*task.Task + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.taskListTemplateMutex.RLock() + defer fake.taskListTemplateMutex.RUnlock() + fake.taskTemplatesMutex.RLock() + defer fake.taskTemplatesMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeStorage) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ tasklist.Storage = new(FakeStorage) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index f5b44125ce771b4bec90965e4637bd8e115906b9..cc16df1cd4392949218e17a91b7ab8995395864d 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -11,31 +11,41 @@ import ( "code.vereign.com/gaiax/tsa/golib/errors" "code.vereign.com/gaiax/tsa/task/internal/service/task" + "code.vereign.com/gaiax/tsa/task/internal/service/tasklist" ) const ( - taskDB = "task" - taskTemplates = "taskTemplates" - taskQueue = "tasks" - tasksHistory = "tasksHistory" + taskDB = "task" + taskTemplates = "taskTemplates" + taskQueue = "tasks" + tasksHistory = "tasksHistory" + taskListQueue = "taskLists" + taskListTemplates = "taskListTemplates" + taskListHistory = "taskListHistory" ) type Storage struct { - templates *mongo.Collection - tasks *mongo.Collection - tasksHistory *mongo.Collection + taskTemplates *mongo.Collection + tasks *mongo.Collection + tasksHistory *mongo.Collection + taskLists *mongo.Collection + taskListTemplates *mongo.Collection + taskListHistory *mongo.Collection } func New(db *mongo.Client) *Storage { return &Storage{ - templates: db.Database(taskDB).Collection(taskTemplates), - tasks: db.Database(taskDB).Collection(taskQueue), - tasksHistory: db.Database(taskDB).Collection(tasksHistory), + taskTemplates: db.Database(taskDB).Collection(taskTemplates), + tasks: db.Database(taskDB).Collection(taskQueue), + tasksHistory: db.Database(taskDB).Collection(tasksHistory), + taskListTemplates: db.Database(taskDB).Collection(taskListTemplates), + taskLists: db.Database(taskDB).Collection(taskListQueue), + taskListHistory: db.Database(taskDB).Collection(taskListHistory), } } func (s *Storage) TaskTemplate(ctx context.Context, taskName string) (*task.Task, error) { - result := s.templates.FindOne(ctx, bson.M{ + result := s.taskTemplates.FindOne(ctx, bson.M{ "name": taskName, }) @@ -59,8 +69,8 @@ func (s *Storage) Add(ctx context.Context, task *task.Task) error { return err } -// Poll retrieves one task from the tasks collection with the -// older ones being retrieved first (FIFO). It updates the state +// Poll retrieves one task with empty groupID from the tasks collection +// with the older ones being retrieved first (FIFO). It updates the state // of the task to "pending", so that consequent calls to Poll would // not retrieve the same task. func (s *Storage) Poll(ctx context.Context) (*task.Task, error) { @@ -69,7 +79,7 @@ func (s *Storage) Poll(ctx context.Context) (*task.Task, error) { SetSort(bson.M{"createdAt": 1}). SetReturnDocument(options.After) - filter := bson.M{"state": task.Created} + filter := bson.M{"state": task.Created, "groupid": ""} update := bson.M{"$set": bson.M{"state": task.Pending}} result := s.tasks.FindOneAndUpdate( ctx, @@ -158,3 +168,150 @@ func (s *Storage) TaskHistory(ctx context.Context, taskID string) (*task.Task, e return &task, nil } + +// TaskListTemplate retrieves one taskList definition by name from storage +func (s *Storage) TaskListTemplate(ctx context.Context, taskListName string) (*tasklist.Template, error) { + result := s.taskListTemplates.FindOne(ctx, bson.M{ + "name": taskListName, + }) + + if result.Err() != nil { + if strings.Contains(result.Err().Error(), "no documents in result") { + return nil, errors.New(errors.NotFound, "taskList template not found") + } + return nil, result.Err() + } + + var tasklist tasklist.Template + if err := result.Decode(&tasklist); err != nil { + return nil, err + } + + return &tasklist, nil +} + +// TaskTemplates retrieves task definitions from storage by names. +// +// The result is a map where 'key' is the task name and 'value' is the task definition +func (s *Storage) TaskTemplates(ctx context.Context, names []string) (map[string]*task.Task, error) { + cursor, err := s.taskTemplates.Find(ctx, bson.M{ + "name": bson.M{"$in": names}, + }) + if err != nil { + return nil, err + } + defer cursor.Close(ctx) + + res := make(map[string]*task.Task) + for cursor.Next(ctx) { + var task task.Task + if err := cursor.Decode(&task); err != nil { + return nil, err + } + res[task.Name] = &task + } + + return res, nil +} + +func (s *Storage) AddTaskList(ctx context.Context, taskList *tasklist.TaskList, tasks []*task.Task) error { + _, err := s.taskLists.InsertOne(ctx, taskList) + if err != nil { + return err + } + + var ti []interface{} + for _, task := range tasks { + ti = append(ti, task) + } + + _, err = s.tasks.InsertMany(ctx, ti) + if err != nil { + if err := s.AckList(ctx, taskList); err != nil { // remove taskList from queue + return errors.New("failed to ack taskList", err) + } + return err + } + + return nil +} + +// AckList removes a taskList from the `tasksLists` collection. +func (s *Storage) AckList(ctx context.Context, taskList *tasklist.TaskList) error { + _, err := s.taskLists.DeleteOne(ctx, bson.M{"id": taskList.ID}) + return err +} + +// PollList retrieves one taskList from the taskLists collection +// with the older ones being retrieved first (FIFO). It updates the state +// of the task to "pending", so that consequent calls to PollList would +// not retrieve the same task. +func (s *Storage) PollList(ctx context.Context) (*tasklist.TaskList, error) { + opts := options. + FindOneAndUpdate(). + SetSort(bson.M{"createdAt": 1}). + SetReturnDocument(options.After) + + filter := bson.M{"state": task.Created} + update := bson.M{"$set": bson.M{"state": task.Pending}} + result := s.taskLists.FindOneAndUpdate( + ctx, + filter, + update, + opts, + ) + + if result.Err() != nil { + if strings.Contains(result.Err().Error(), "no documents in result") { + return nil, errors.New(errors.NotFound, "taskList not found") + } + return nil, result.Err() + } + + var list tasklist.TaskList + if err := result.Decode(&list); err != nil { + return nil, err + } + + return &list, nil +} + +// GetGroupTasks fetches all tasks by a groupID +func (s *Storage) GetGroupTasks(ctx context.Context, group *tasklist.Group) ([]*task.Task, error) { + filter := bson.M{"groupid": group.ID} + opts := options.Find().SetSort(bson.M{"createdAt": 1}) + + cursor, err := s.tasks.Find(ctx, filter, opts) + if err != nil { + return nil, err + } + defer cursor.Close(ctx) + + var tasks []*task.Task + for cursor.Next(ctx) { + var task task.Task + if err := cursor.Decode(&task); err != nil { + return nil, err + } + tasks = append(tasks, &task) + } + + return tasks, nil +} + +// AckGroupTasks removes tasks from tasks collection by groupID +func (s *Storage) AckGroupTasks(ctx context.Context, group *tasklist.Group) error { + _, err := s.tasks.DeleteMany(ctx, bson.M{"groupid": group.ID}) + return err +} + +// SaveTaskListHistory adds a tasklist to the taskListHistory collection +func (s *Storage) SaveTaskListHistory(ctx context.Context, taskList *tasklist.TaskList) error { + insert := func() error { + _, err := s.taskListHistory.InsertOne(ctx, taskList) + return err + } + + b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) + return backoff.Retry(insert, b) +}