diff --git a/cmd/task/main.go b/cmd/task/main.go index c1c990b580f337f3325e88973d5f99b103a4b378..0e879c569aaa2eaa205d8c4176547c24071a670f 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -8,6 +8,8 @@ import ( "time" "github.com/kelseyhightower/envconfig" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" "go.uber.org/zap/zapcore" goahttp "goa.design/goa/v3/http" @@ -18,10 +20,14 @@ import ( goahealth "code.vereign.com/gaiax/tsa/task/gen/health" goahealthsrv "code.vereign.com/gaiax/tsa/task/gen/http/health/server" goaopenapisrv "code.vereign.com/gaiax/tsa/task/gen/http/openapi/server" + goatasksrv "code.vereign.com/gaiax/tsa/task/gen/http/task/server" "code.vereign.com/gaiax/tsa/task/gen/openapi" + goatask "code.vereign.com/gaiax/tsa/task/gen/task" "code.vereign.com/gaiax/tsa/task/internal/config" "code.vereign.com/gaiax/tsa/task/internal/service" "code.vereign.com/gaiax/tsa/task/internal/service/health" + "code.vereign.com/gaiax/tsa/task/internal/service/task" + "code.vereign.com/gaiax/tsa/task/internal/storage" ) var Version = "0.0.0+development" @@ -38,22 +44,42 @@ func main() { } defer logger.Sync() //nolint:errcheck - logger.Info("start task service", zap.String("version", Version), zap.String("goa", goa.Version())) + logger.Info("task service started", zap.String("version", Version), zap.String("goa", goa.Version())) + + // connect to mongo db + db, err := mongo.Connect( + context.Background(), + options.Client().ApplyURI(cfg.Mongo.Addr).SetAuth(options.Credential{ + Username: cfg.Mongo.User, + Password: cfg.Mongo.Pass, + }), + ) + if err != nil { + logger.Fatal("error connecting to mongodb", zap.Error(err)) + } + defer db.Disconnect(context.Background()) //nolint:errcheck + + // create storage + storage := storage.New(db) // create services var ( + taskSvc goatask.Service healthSvc goahealth.Service ) { + taskSvc = task.New(storage, storage, logger) healthSvc = health.New() } // create endpoints var ( + taskEndpoints *goatask.Endpoints healthEndpoints *goahealth.Endpoints openapiEndpoints *openapi.Endpoints ) { + taskEndpoints = goatask.NewEndpoints(taskSvc) healthEndpoints = goahealth.NewEndpoints(healthSvc) openapiEndpoints = openapi.NewEndpoints(nil) } @@ -76,15 +102,18 @@ func main() { // the service input and output data structures to HTTP requests and // responses. var ( + taskServer *goatasksrv.Server healthServer *goahealthsrv.Server openapiServer *goaopenapisrv.Server ) { + taskServer = goatasksrv.New(taskEndpoints, mux, dec, enc, nil, errFormatter) healthServer = goahealthsrv.New(healthEndpoints, mux, dec, enc, nil, errFormatter) openapiServer = goaopenapisrv.New(openapiEndpoints, mux, dec, enc, nil, errFormatter, nil, nil) } // Configure the mux. + goatasksrv.Mount(mux, taskServer) goahealthsrv.Mount(mux, healthServer) goaopenapisrv.Mount(mux, openapiServer) diff --git a/design/types.go b/design/types.go index dbddbccd2becc16e6f2ce6c7f82f239f71318ab7..46eb0674fd655eab128a305629756b16ad603f63 100644 --- a/design/types.go +++ b/design/types.go @@ -5,7 +5,7 @@ import . "goa.design/goa/v3/dsl" var CreateRequest = Type("CreateRequest", func() { Field(1, "taskName", String, "Task name.") - Field(2, "data", Any, "Data contains JSON payload that will be used for task exection.") + Field(2, "data", Any, "Data contains JSON payload that will be used for task execution.") Required("taskName", "data") }) diff --git a/gen/http/openapi.json b/gen/http/openapi.json index e3b530a87656bab17f252a35711ad4d5479e0a8a..49774f5de7bd482690963e51d82ed70d59336815 100644 --- a/gen/http/openapi.json +++ b/gen/http/openapi.json @@ -1 +1 @@ -{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task exection.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Corrupti facere sequi tempora eius assumenda molestiae."}},"example":{"taskID":"Laborum sapiente."},"required":["taskID"]}}} \ No newline at end of file +{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Corrupti facere sequi tempora eius assumenda molestiae."}},"example":{"taskID":"Laborum sapiente."},"required":["taskID"]}}} \ No newline at end of file diff --git a/gen/http/openapi.yaml b/gen/http/openapi.yaml index 31883e5e2246dbcdd86a60db8d2fe1495e0e8c67..61bbc9cae68f700f4c9cede40c777ce525718f22 100644 --- a/gen/http/openapi.yaml +++ b/gen/http/openapi.yaml @@ -50,7 +50,7 @@ paths: type: string - name: any in: body - description: Data contains JSON payload that will be used for task exection. + description: Data contains JSON payload that will be used for task execution. required: true schema: type: string diff --git a/gen/http/openapi3.json b/gen/http/openapi3.json index 0fd87f26f8fb8d556dbf32fd6589849252c02ee1..d579254c652ff77ebd83f7ed6833017cbb673e76 100644 --- a/gen/http/openapi3.json +++ b/gen/http/openapi3.json @@ -1 +1 @@ -{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Eveniet et eligendi sint quibusdam quia maxime."},"example":"Et ipsa voluptate."}],"requestBody":{"description":"Data contains JSON payload that will be used for task exection.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task exection.","example":"Nam et.","format":"binary"},"example":"Corrupti quia autem dolorum sunt aperiam quaerat."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Pariatur sequi et."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Facere quibusdam voluptate beatae."}},"example":{"taskID":"Eaque excepturi suscipit veritatis nemo."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} \ No newline at end of file +{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Eveniet et eligendi sint quibusdam quia maxime."},"example":"Et ipsa voluptate."}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Nam et.","format":"binary"},"example":"Corrupti quia autem dolorum sunt aperiam quaerat."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Pariatur sequi et."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Facere quibusdam voluptate beatae."}},"example":{"taskID":"Eaque excepturi suscipit veritatis nemo."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} \ No newline at end of file diff --git a/gen/http/openapi3.yaml b/gen/http/openapi3.yaml index 95908aa62217168b60d2268813d2c62ce834aca6..277245235bd2acb3a2e9c016dce06063e30b1afd 100644 --- a/gen/http/openapi3.yaml +++ b/gen/http/openapi3.yaml @@ -43,13 +43,13 @@ paths: example: Eveniet et eligendi sint quibusdam quia maxime. example: Et ipsa voluptate. requestBody: - description: Data contains JSON payload that will be used for task exection. + description: Data contains JSON payload that will be used for task execution. required: true content: application/json: schema: type: string - description: Data contains JSON payload that will be used for task exection. + description: Data contains JSON payload that will be used for task execution. example: Nam et. format: binary example: Corrupti quia autem dolorum sunt aperiam quaerat. diff --git a/gen/task/service.go b/gen/task/service.go index 4417a398bdde609c50ff5c20dcc723c4a6b9e731..6bd66d5b79088ae0d78f536af3b26e70d75d3685 100644 --- a/gen/task/service.go +++ b/gen/task/service.go @@ -31,7 +31,7 @@ var MethodNames = [1]string{"Create"} type CreateRequest struct { // Task name. TaskName string - // Data contains JSON payload that will be used for task exection. + // Data contains JSON payload that will be used for task execution. Data interface{} } diff --git a/internal/config/config.go b/internal/config/config.go index bdb25c11fa2d8aa3c2ea8c19adb6d93c15d65799..0d0b375d7984af01183f3a1cd30fa12116a63326 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,7 +3,8 @@ package config import "time" type Config struct { - HTTP httpConfig + HTTP httpConfig + Mongo mongoConfig LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` } @@ -15,3 +16,9 @@ type httpConfig struct { ReadTimeout time.Duration `envconfig:"HTTP_READ_TIMEOUT" default:"10s"` WriteTimeout time.Duration `envconfig:"HTTP_WRITE_TIMEOUT" default:"10s"` } + +type mongoConfig struct { + Addr string `envconfig:"MONGO_ADDR" required:"true"` + User string `envconfig:"MONGO_USER" required:"true"` + Pass string `envconfig:"MONGO_PASS" required:"true"` +} diff --git a/internal/service/task/service.go b/internal/service/task/service.go new file mode 100644 index 0000000000000000000000000000000000000000..024e1112a6e84aac8da03d4e25b48f324027b88c --- /dev/null +++ b/internal/service/task/service.go @@ -0,0 +1,65 @@ +package task + +import ( + "context" + "encoding/json" + "time" + + "github.com/google/uuid" + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + goatask "code.vereign.com/gaiax/tsa/task/gen/task" +) + +type Storage interface { + TaskTemplate(ctx context.Context, taskName string) (*Task, error) +} + +type Queue interface { + AddTask(ctx context.Context, task *Task) error +} + +type Service struct { + storage Storage + queue Queue + logger *zap.Logger +} + +func New(template Storage, queue Queue, logger *zap.Logger) *Service { + return &Service{ + storage: template, + queue: queue, + logger: logger, + } +} + +// Create a new task. +func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *goatask.CreateResult, err error) { + logger := s.logger.With(zap.String("taskName", req.TaskName)) + + // get predefined task definition from storage + task, err := s.storage.TaskTemplate(ctx, req.TaskName) + if err != nil { + logger.Error("error getting task template from storage", zap.Error(err)) + return nil, errors.New("error executing task", err) + } + + taskRequest, err := json.Marshal(req.Data) + if err != nil { + logger.Error("error marshaling request data to JSON", zap.Error(err)) + return nil, errors.New(errors.BadRequest, "error marshaling request data to JSON", err) + } + + task.ID = uuid.NewString() + task.State = Created + task.CreatedAt = time.Now() + task.Request = taskRequest + + if err := s.queue.AddTask(ctx, task); err != nil { + logger.Error("error adding task to queue", zap.Error(err)) + return nil, errors.New("failed to create task", err) + } + + return &goatask.CreateResult{TaskID: task.ID}, nil +} diff --git a/internal/service/task/task.go b/internal/service/task/task.go new file mode 100644 index 0000000000000000000000000000000000000000..4a2a677974cbf2cf08d6143fdb59679ad8fd2869 --- /dev/null +++ b/internal/service/task/task.go @@ -0,0 +1,48 @@ +package task + +import "time" + +type State string + +const ( + // Created is the initial task state. + Created = "created" + + // Pending state is when a worker has marked the task for processing + // indicating to other workers that they should not process the task. + Pending = "pending" + + // Done state is when the task is completed. + // TODO(penkovski): do we need this state if task is deleted after it is done? + Done = "done" +) + +type Task struct { + ID string `json:"id"` // ID is unique task identifier. + GroupID string `json:"groupID"` // GroupID is set when the task is part of `task.Group`. + Name string `json:"name"` // Name is used by external callers use to create tasks. + State State `json:"state"` // State of the task. + URL string `json:"url"` // URL against which the task request will be executed. + Method string `json:"method"` // HTTP method of the task request. + Request []byte `json:"request"` // Request body which will be sent in the task request. + Response []byte `json:"response"` // Response received after the task request is executed. + ResponseCode int `json:"responseCode"` // Response code received after task request is executed. + RequestPolicy string `json:"requestPolicy"` // Request policy to be executed before task request execution. + ResponsePolicy string `json:"responsePolicy"` // Response policy to be executed before the response is accepted. + FinalPolicy string `json:"finalPolicy"` // TODO(penkovski): how is different from ResponsePolicy? + CacheKey string `json:"cacheKey"` // CacheKey used for storing the response in cache. + CreatedAt time.Time `json:"createdAt"` // CreatedAt specifies task creation time. + StartedAt time.Time `json:"startedAt"` // StartedAt specifies task execution start time. + FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done. +} + +type Group struct { + ID string + Tasks []Task + State State + Metadata interface{} // TODO(penkovski): not yet clear + FinalPolicy string + CreateTime time.Time + StartTime time.Time + FinishTime time.Time +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go new file mode 100644 index 0000000000000000000000000000000000000000..be5eff9b7640316c975dc836fdb4caedaa50f434 --- /dev/null +++ b/internal/storage/storage.go @@ -0,0 +1,55 @@ +package storage + +import ( + "context" + "strings" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + + "code.vereign.com/gaiax/tsa/golib/errors" + "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +const ( + taskDB = "task" + taskTemplates = "taskTemplates" + taskQueue = "tasks" +) + +type Storage struct { + templates *mongo.Collection + tasks *mongo.Collection +} + +func New(db *mongo.Client) *Storage { + return &Storage{ + templates: db.Database(taskDB).Collection(taskTemplates), + tasks: db.Database(taskDB).Collection(taskQueue), + } +} + +func (s *Storage) TaskTemplate(ctx context.Context, taskName string) (*task.Task, error) { + result := s.templates.FindOne(ctx, bson.M{ + "name": taskName, + }) + + if result.Err() != nil { + if strings.Contains(result.Err().Error(), "no documents in result") { + return nil, errors.New(errors.NotFound, "task template not found") + } + return nil, result.Err() + } + + var task task.Task + if err := result.Decode(&task); err != nil { + return nil, err + } + + return &task, nil +} + +func (s *Storage) AddTask(ctx context.Context, task *task.Task) error { + _, err := s.tasks.InsertOne(ctx, task) + return err +}