Skip to content
Snippets Groups Projects
Commit e65e6221 authored by Lyuben Penkovski's avatar Lyuben Penkovski
Browse files

Implement create task HTTP endpoint

parent d92a73cd
No related branches found
No related tags found
1 merge request!2Create task HTTP endpoint
Pipeline #49807 passed with stage
in 38 seconds
......@@ -8,6 +8,8 @@ import (
"time"
"github.com/kelseyhightower/envconfig"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
goahttp "goa.design/goa/v3/http"
......@@ -18,10 +20,14 @@ import (
goahealth "code.vereign.com/gaiax/tsa/task/gen/health"
goahealthsrv "code.vereign.com/gaiax/tsa/task/gen/http/health/server"
goaopenapisrv "code.vereign.com/gaiax/tsa/task/gen/http/openapi/server"
goatasksrv "code.vereign.com/gaiax/tsa/task/gen/http/task/server"
"code.vereign.com/gaiax/tsa/task/gen/openapi"
goatask "code.vereign.com/gaiax/tsa/task/gen/task"
"code.vereign.com/gaiax/tsa/task/internal/config"
"code.vereign.com/gaiax/tsa/task/internal/service"
"code.vereign.com/gaiax/tsa/task/internal/service/health"
"code.vereign.com/gaiax/tsa/task/internal/service/task"
"code.vereign.com/gaiax/tsa/task/internal/storage"
)
var Version = "0.0.0+development"
......@@ -38,22 +44,42 @@ func main() {
}
defer logger.Sync() //nolint:errcheck
logger.Info("start task service", zap.String("version", Version), zap.String("goa", goa.Version()))
logger.Info("task service started", zap.String("version", Version), zap.String("goa", goa.Version()))
// connect to mongo db
db, err := mongo.Connect(
context.Background(),
options.Client().ApplyURI(cfg.Mongo.Addr).SetAuth(options.Credential{
Username: cfg.Mongo.User,
Password: cfg.Mongo.Pass,
}),
)
if err != nil {
logger.Fatal("error connecting to mongodb", zap.Error(err))
}
defer db.Disconnect(context.Background()) //nolint:errcheck
// create storage
storage := storage.New(db)
// create services
var (
taskSvc goatask.Service
healthSvc goahealth.Service
)
{
taskSvc = task.New(storage, storage, logger)
healthSvc = health.New()
}
// create endpoints
var (
taskEndpoints *goatask.Endpoints
healthEndpoints *goahealth.Endpoints
openapiEndpoints *openapi.Endpoints
)
{
taskEndpoints = goatask.NewEndpoints(taskSvc)
healthEndpoints = goahealth.NewEndpoints(healthSvc)
openapiEndpoints = openapi.NewEndpoints(nil)
}
......@@ -76,15 +102,18 @@ func main() {
// the service input and output data structures to HTTP requests and
// responses.
var (
taskServer *goatasksrv.Server
healthServer *goahealthsrv.Server
openapiServer *goaopenapisrv.Server
)
{
taskServer = goatasksrv.New(taskEndpoints, mux, dec, enc, nil, errFormatter)
healthServer = goahealthsrv.New(healthEndpoints, mux, dec, enc, nil, errFormatter)
openapiServer = goaopenapisrv.New(openapiEndpoints, mux, dec, enc, nil, errFormatter, nil, nil)
}
// Configure the mux.
goatasksrv.Mount(mux, taskServer)
goahealthsrv.Mount(mux, healthServer)
goaopenapisrv.Mount(mux, openapiServer)
......
......@@ -5,7 +5,7 @@ import . "goa.design/goa/v3/dsl"
var CreateRequest = Type("CreateRequest", func() {
Field(1, "taskName", String, "Task name.")
Field(2, "data", Any, "Data contains JSON payload that will be used for task exection.")
Field(2, "data", Any, "Data contains JSON payload that will be used for task execution.")
Required("taskName", "data")
})
......
{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task exection.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Corrupti facere sequi tempora eius assumenda molestiae."}},"example":{"taskID":"Laborum sapiente."},"required":["taskID"]}}}
\ No newline at end of file
{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Corrupti facere sequi tempora eius assumenda molestiae."}},"example":{"taskID":"Laborum sapiente."},"required":["taskID"]}}}
\ No newline at end of file
......@@ -50,7 +50,7 @@ paths:
type: string
- name: any
in: body
description: Data contains JSON payload that will be used for task exection.
description: Data contains JSON payload that will be used for task execution.
required: true
schema:
type: string
......
{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Eveniet et eligendi sint quibusdam quia maxime."},"example":"Et ipsa voluptate."}],"requestBody":{"description":"Data contains JSON payload that will be used for task exection.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task exection.","example":"Nam et.","format":"binary"},"example":"Corrupti quia autem dolorum sunt aperiam quaerat."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Pariatur sequi et."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Facere quibusdam voluptate beatae."}},"example":{"taskID":"Eaque excepturi suscipit veritatis nemo."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]}
\ No newline at end of file
{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Eveniet et eligendi sint quibusdam quia maxime."},"example":"Et ipsa voluptate."}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Nam et.","format":"binary"},"example":"Corrupti quia autem dolorum sunt aperiam quaerat."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Pariatur sequi et."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Facere quibusdam voluptate beatae."}},"example":{"taskID":"Eaque excepturi suscipit veritatis nemo."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]}
\ No newline at end of file
......@@ -43,13 +43,13 @@ paths:
example: Eveniet et eligendi sint quibusdam quia maxime.
example: Et ipsa voluptate.
requestBody:
description: Data contains JSON payload that will be used for task exection.
description: Data contains JSON payload that will be used for task execution.
required: true
content:
application/json:
schema:
type: string
description: Data contains JSON payload that will be used for task exection.
description: Data contains JSON payload that will be used for task execution.
example: Nam et.
format: binary
example: Corrupti quia autem dolorum sunt aperiam quaerat.
......
......@@ -31,7 +31,7 @@ var MethodNames = [1]string{"Create"}
type CreateRequest struct {
// Task name.
TaskName string
// Data contains JSON payload that will be used for task exection.
// Data contains JSON payload that will be used for task execution.
Data interface{}
}
......
......@@ -3,7 +3,8 @@ package config
import "time"
type Config struct {
HTTP httpConfig
HTTP httpConfig
Mongo mongoConfig
LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"`
}
......@@ -15,3 +16,9 @@ type httpConfig struct {
ReadTimeout time.Duration `envconfig:"HTTP_READ_TIMEOUT" default:"10s"`
WriteTimeout time.Duration `envconfig:"HTTP_WRITE_TIMEOUT" default:"10s"`
}
type mongoConfig struct {
Addr string `envconfig:"MONGO_ADDR" required:"true"`
User string `envconfig:"MONGO_USER" required:"true"`
Pass string `envconfig:"MONGO_PASS" required:"true"`
}
package task
import (
"context"
"encoding/json"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/golib/errors"
goatask "code.vereign.com/gaiax/tsa/task/gen/task"
)
type Storage interface {
TaskTemplate(ctx context.Context, taskName string) (*Task, error)
}
type Queue interface {
AddTask(ctx context.Context, task *Task) error
}
type Service struct {
storage Storage
queue Queue
logger *zap.Logger
}
func New(template Storage, queue Queue, logger *zap.Logger) *Service {
return &Service{
storage: template,
queue: queue,
logger: logger,
}
}
// Create a new task.
func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *goatask.CreateResult, err error) {
logger := s.logger.With(zap.String("taskName", req.TaskName))
// get predefined task definition from storage
task, err := s.storage.TaskTemplate(ctx, req.TaskName)
if err != nil {
logger.Error("error getting task template from storage", zap.Error(err))
return nil, errors.New("error executing task", err)
}
taskRequest, err := json.Marshal(req.Data)
if err != nil {
logger.Error("error marshaling request data to JSON", zap.Error(err))
return nil, errors.New(errors.BadRequest, "error marshaling request data to JSON", err)
}
task.ID = uuid.NewString()
task.State = Created
task.CreatedAt = time.Now()
task.Request = taskRequest
if err := s.queue.AddTask(ctx, task); err != nil {
logger.Error("error adding task to queue", zap.Error(err))
return nil, errors.New("failed to create task", err)
}
return &goatask.CreateResult{TaskID: task.ID}, nil
}
package task
import "time"
type State string
const (
// Created is the initial task state.
Created = "created"
// Pending state is when a worker has marked the task for processing
// indicating to other workers that they should not process the task.
Pending = "pending"
// Done state is when the task is completed.
// TODO(penkovski): do we need this state if task is deleted after it is done?
Done = "done"
)
type Task struct {
ID string `json:"id"` // ID is unique task identifier.
GroupID string `json:"groupID"` // GroupID is set when the task is part of `task.Group`.
Name string `json:"name"` // Name is used by external callers use to create tasks.
State State `json:"state"` // State of the task.
URL string `json:"url"` // URL against which the task request will be executed.
Method string `json:"method"` // HTTP method of the task request.
Request []byte `json:"request"` // Request body which will be sent in the task request.
Response []byte `json:"response"` // Response received after the task request is executed.
ResponseCode int `json:"responseCode"` // Response code received after task request is executed.
RequestPolicy string `json:"requestPolicy"` // Request policy to be executed before task request execution.
ResponsePolicy string `json:"responsePolicy"` // Response policy to be executed before the response is accepted.
FinalPolicy string `json:"finalPolicy"` // TODO(penkovski): how is different from ResponsePolicy?
CacheKey string `json:"cacheKey"` // CacheKey used for storing the response in cache.
CreatedAt time.Time `json:"createdAt"` // CreatedAt specifies task creation time.
StartedAt time.Time `json:"startedAt"` // StartedAt specifies task execution start time.
FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done.
}
type Group struct {
ID string
Tasks []Task
State State
Metadata interface{} // TODO(penkovski): not yet clear
FinalPolicy string
CreateTime time.Time
StartTime time.Time
FinishTime time.Time
}
package storage
import (
"context"
"strings"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"code.vereign.com/gaiax/tsa/golib/errors"
"code.vereign.com/gaiax/tsa/task/internal/service/task"
)
const (
taskDB = "task"
taskTemplates = "taskTemplates"
taskQueue = "tasks"
)
type Storage struct {
templates *mongo.Collection
tasks *mongo.Collection
}
func New(db *mongo.Client) *Storage {
return &Storage{
templates: db.Database(taskDB).Collection(taskTemplates),
tasks: db.Database(taskDB).Collection(taskQueue),
}
}
func (s *Storage) TaskTemplate(ctx context.Context, taskName string) (*task.Task, error) {
result := s.templates.FindOne(ctx, bson.M{
"name": taskName,
})
if result.Err() != nil {
if strings.Contains(result.Err().Error(), "no documents in result") {
return nil, errors.New(errors.NotFound, "task template not found")
}
return nil, result.Err()
}
var task task.Task
if err := result.Decode(&task); err != nil {
return nil, err
}
return &task, nil
}
func (s *Storage) AddTask(ctx context.Context, task *task.Task) error {
_, err := s.tasks.InsertOne(ctx, task)
return err
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment