diff --git a/cmd/task/main.go b/cmd/task/main.go index d293aa64636f22d22b65438e0bd1056df823e6e7..a349fd17c7b7cb703187fde31e9cb172592a528a 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -24,6 +24,7 @@ import ( goatasksrv "code.vereign.com/gaiax/tsa/task/gen/http/task/server" "code.vereign.com/gaiax/tsa/task/gen/openapi" goatask "code.vereign.com/gaiax/tsa/task/gen/task" + "code.vereign.com/gaiax/tsa/task/internal/clients/cache" "code.vereign.com/gaiax/tsa/task/internal/clients/policy" "code.vereign.com/gaiax/tsa/task/internal/config" "code.vereign.com/gaiax/tsa/task/internal/executor" @@ -68,11 +69,15 @@ func main() { // create policy client policy := policy.New(cfg.Policy.Addr, httpClient()) + // create cache client + cache := cache.New(cfg.Cache.Addr) + // create task executor executor := executor.New( storage, policy, storage, + cache, cfg.Executor.Workers, cfg.Executor.PollInterval, httpClient(), diff --git a/design/design.go b/design/design.go index a733290c3bcb1c13b9af29ab2480c5d8773c3f03..0e21586fbef9ea8846c3c78c87d649076b2398c8 100644 --- a/design/design.go +++ b/design/design.go @@ -24,7 +24,16 @@ var _ = Service("task", func() { Result(CreateResult) HTTP(func() { POST("/v1/task/{taskName}") + + Header("cacheNamespace:x-cache-namespace", String, "Cache key namespace", func() { + Example("login") + }) + Header("cacheScope:x-cache-scope", String, "Cache key scope", func() { + Example("user") + }) + Body("data") + Response(StatusOK) }) }) diff --git a/design/types.go b/design/types.go index 46eb0674fd655eab128a305629756b16ad603f63..ed8662a42b4811224176ed58405dafc801b6f082 100644 --- a/design/types.go +++ b/design/types.go @@ -6,6 +6,8 @@ import . "goa.design/goa/v3/dsl" var CreateRequest = Type("CreateRequest", func() { Field(1, "taskName", String, "Task name.") Field(2, "data", Any, "Data contains JSON payload that will be used for task execution.") + Field(3, "cacheNamespace", String, "Cache key namespace.") + Field(4, "cacheScope", String, "Cache key scope.") Required("taskName", "data") }) diff --git a/gen/http/cli/task/cli.go b/gen/http/cli/task/cli.go index 248d9e39b29af21128fa47807c1d4eddba78f800..c859f0e4edee502712eaaba346baa5782937eddc 100644 --- a/gen/http/cli/task/cli.go +++ b/gen/http/cli/task/cli.go @@ -32,7 +32,7 @@ task create // UsageExamples produces an example of a valid invocation of the CLI tool. func UsageExamples() string { return os.Args[0] + ` health liveness` + "\n" + - os.Args[0] + ` task create --body "Minus reprehenderit non quo nihil adipisci." --task-name "Omnis illum rerum."` + "\n" + + os.Args[0] + ` task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae."` + "\n" + "" } @@ -54,9 +54,11 @@ func ParseEndpoint( taskFlags = flag.NewFlagSet("task", flag.ContinueOnError) - taskCreateFlags = flag.NewFlagSet("create", flag.ExitOnError) - taskCreateBodyFlag = taskCreateFlags.String("body", "REQUIRED", "") - taskCreateTaskNameFlag = taskCreateFlags.String("task-name", "REQUIRED", "Task name.") + taskCreateFlags = flag.NewFlagSet("create", flag.ExitOnError) + taskCreateBodyFlag = taskCreateFlags.String("body", "REQUIRED", "") + taskCreateTaskNameFlag = taskCreateFlags.String("task-name", "REQUIRED", "Task name.") + taskCreateCacheNamespaceFlag = taskCreateFlags.String("cache-namespace", "", "") + taskCreateCacheScopeFlag = taskCreateFlags.String("cache-scope", "", "") ) healthFlags.Usage = healthUsage healthLivenessFlags.Usage = healthLivenessUsage @@ -151,7 +153,7 @@ func ParseEndpoint( switch epn { case "create": endpoint = c.Create() - data, err = taskc.BuildCreatePayload(*taskCreateBodyFlag, *taskCreateTaskNameFlag) + data, err = taskc.BuildCreatePayload(*taskCreateBodyFlag, *taskCreateTaskNameFlag, *taskCreateCacheNamespaceFlag, *taskCreateCacheScopeFlag) } } } @@ -210,13 +212,15 @@ Additional help: `, os.Args[0]) } func taskCreateUsage() { - fmt.Fprintf(os.Stderr, `%[1]s [flags] task create -body JSON -task-name STRING + fmt.Fprintf(os.Stderr, `%[1]s [flags] task create -body JSON -task-name STRING -cache-namespace STRING -cache-scope STRING Create a task and put it in a queue for execution. -body JSON: -task-name STRING: Task name. + -cache-namespace STRING: + -cache-scope STRING: Example: - %[1]s task create --body "Minus reprehenderit non quo nihil adipisci." --task-name "Omnis illum rerum." + %[1]s task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae." `, os.Args[0]) } diff --git a/gen/http/openapi.json b/gen/http/openapi.json index 49774f5de7bd482690963e51d82ed70d59336815..cbf1687e983ec137eb1e5d86c0eefec31be36b3d 100644 --- a/gen/http/openapi.json +++ b/gen/http/openapi.json @@ -1 +1 @@ -{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Corrupti facere sequi tempora eius assumenda molestiae."}},"example":{"taskID":"Laborum sapiente."},"required":["taskID"]}}} \ No newline at end of file +{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","required":false,"type":"string"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","required":false,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Eaque excepturi suscipit veritatis nemo."}},"example":{"taskID":"Corrupti quia autem dolorum sunt aperiam quaerat."},"required":["taskID"]}}} \ No newline at end of file diff --git a/gen/http/openapi.yaml b/gen/http/openapi.yaml index 61bbc9cae68f700f4c9cede40c777ce525718f22..ca530f83c341213f3f961f435c8ecf88d4d75f6e 100644 --- a/gen/http/openapi.yaml +++ b/gen/http/openapi.yaml @@ -48,6 +48,16 @@ paths: description: Task name. required: true type: string + - name: x-cache-namespace + in: header + description: Cache key namespace + required: false + type: string + - name: x-cache-scope + in: header + description: Cache key scope + required: false + type: string - name: any in: body description: Data contains JSON payload that will be used for task execution. @@ -72,8 +82,8 @@ definitions: taskID: type: string description: Unique task identifier. - example: Corrupti facere sequi tempora eius assumenda molestiae. + example: Eaque excepturi suscipit veritatis nemo. example: - taskID: Laborum sapiente. + taskID: Corrupti quia autem dolorum sunt aperiam quaerat. required: - taskID diff --git a/gen/http/openapi3.json b/gen/http/openapi3.json index d579254c652ff77ebd83f7ed6833017cbb673e76..deb72810cb4565839f30f5ddb709475fc8d2bebf 100644 --- a/gen/http/openapi3.json +++ b/gen/http/openapi3.json @@ -1 +1 @@ -{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Eveniet et eligendi sint quibusdam quia maxime."},"example":"Et ipsa voluptate."}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Nam et.","format":"binary"},"example":"Corrupti quia autem dolorum sunt aperiam quaerat."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Pariatur sequi et."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Facere quibusdam voluptate beatae."}},"example":{"taskID":"Eaque excepturi suscipit veritatis nemo."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} \ No newline at end of file +{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Vel odio et doloribus est quod laborum."},"example":"Harum aut autem aliquam dolorem non soluta."},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key namespace","example":"login"},"example":"login"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key scope","example":"user"},"example":"user"}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Eveniet et eligendi sint quibusdam quia maxime.","format":"binary"},"example":"Ipsam et est accusantium."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Facere quibusdam voluptate beatae."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Et ipsa voluptate."}},"example":{"taskID":"Quo qui fuga impedit eos fuga et."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} \ No newline at end of file diff --git a/gen/http/openapi3.yaml b/gen/http/openapi3.yaml index 277245235bd2acb3a2e9c016dce06063e30b1afd..8e14b7c0de5748cbd7901c7eea9745a8feb357ca 100644 --- a/gen/http/openapi3.yaml +++ b/gen/http/openapi3.yaml @@ -40,8 +40,26 @@ paths: schema: type: string description: Task name. - example: Eveniet et eligendi sint quibusdam quia maxime. - example: Et ipsa voluptate. + example: Vel odio et doloribus est quod laborum. + example: Harum aut autem aliquam dolorem non soluta. + - name: x-cache-namespace + in: header + description: Cache key namespace + allowEmptyValue: true + schema: + type: string + description: Cache key namespace + example: login + example: login + - name: x-cache-scope + in: header + description: Cache key scope + allowEmptyValue: true + schema: + type: string + description: Cache key scope + example: user + example: user requestBody: description: Data contains JSON payload that will be used for task execution. required: true @@ -50,9 +68,9 @@ paths: schema: type: string description: Data contains JSON payload that will be used for task execution. - example: Nam et. + example: Eveniet et eligendi sint quibusdam quia maxime. format: binary - example: Corrupti quia autem dolorum sunt aperiam quaerat. + example: Ipsam et est accusantium. responses: "200": description: OK response. @@ -61,7 +79,7 @@ paths: schema: $ref: '#/components/schemas/CreateResult' example: - taskID: Pariatur sequi et. + taskID: Facere quibusdam voluptate beatae. components: schemas: CreateResult: @@ -70,9 +88,9 @@ components: taskID: type: string description: Unique task identifier. - example: Facere quibusdam voluptate beatae. + example: Et ipsa voluptate. example: - taskID: Eaque excepturi suscipit veritatis nemo. + taskID: Quo qui fuga impedit eos fuga et. required: - taskID tags: diff --git a/gen/http/task/client/cli.go b/gen/http/task/client/cli.go index 5ead7e96e6ebb3ceda6544b329fffe02c8a81a96..b6ff7fb25a49ddd34dfa22013d4c4d398536335f 100644 --- a/gen/http/task/client/cli.go +++ b/gen/http/task/client/cli.go @@ -16,24 +16,38 @@ import ( // BuildCreatePayload builds the payload for the task Create endpoint from CLI // flags. -func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string) (*task.CreateRequest, error) { +func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string, taskCreateCacheNamespace string, taskCreateCacheScope string) (*task.CreateRequest, error) { var err error var body interface{} { err = json.Unmarshal([]byte(taskCreateBody), &body) if err != nil { - return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Minus reprehenderit non quo nihil adipisci.\"") + return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Nam et.\"") } } var taskName string { taskName = taskCreateTaskName } + var cacheNamespace *string + { + if taskCreateCacheNamespace != "" { + cacheNamespace = &taskCreateCacheNamespace + } + } + var cacheScope *string + { + if taskCreateCacheScope != "" { + cacheScope = &taskCreateCacheScope + } + } v := body res := &task.CreateRequest{ Data: v, } res.TaskName = taskName + res.CacheNamespace = cacheNamespace + res.CacheScope = cacheScope return res, nil } diff --git a/gen/http/task/client/encode_decode.go b/gen/http/task/client/encode_decode.go index 5d4bfc720afba2998e07385bfbb0fdbd03324b67..52ac5bd62e0c8889272354c4336f550af8ea593f 100644 --- a/gen/http/task/client/encode_decode.go +++ b/gen/http/task/client/encode_decode.go @@ -51,6 +51,14 @@ func EncodeCreateRequest(encoder func(*http.Request) goahttp.Encoder) func(*http if !ok { return goahttp.ErrInvalidType("task", "Create", "*task.CreateRequest", v) } + if p.CacheNamespace != nil { + head := *p.CacheNamespace + req.Header.Set("x-cache-namespace", head) + } + if p.CacheScope != nil { + head := *p.CacheScope + req.Header.Set("x-cache-scope", head) + } body := p.Data if err := encoder(req).Encode(&body); err != nil { return goahttp.ErrEncodingError("task", "Create", err) diff --git a/gen/http/task/server/encode_decode.go b/gen/http/task/server/encode_decode.go index 4e6ddfac7883b65925fbbe505aec623f32ee0e17..f4549af31a070fd6f67faa7273753f1475c0929b 100644 --- a/gen/http/task/server/encode_decode.go +++ b/gen/http/task/server/encode_decode.go @@ -46,12 +46,22 @@ func DecodeCreateRequest(mux goahttp.Muxer, decoder func(*http.Request) goahttp. } var ( - taskName string + taskName string + cacheNamespace *string + cacheScope *string params = mux.Vars(r) ) taskName = params["taskName"] - payload := NewCreateRequest(body, taskName) + cacheNamespaceRaw := r.Header.Get("x-cache-namespace") + if cacheNamespaceRaw != "" { + cacheNamespace = &cacheNamespaceRaw + } + cacheScopeRaw := r.Header.Get("x-cache-scope") + if cacheScopeRaw != "" { + cacheScope = &cacheScopeRaw + } + payload := NewCreateRequest(body, taskName, cacheNamespace, cacheScope) return payload, nil } diff --git a/gen/http/task/server/types.go b/gen/http/task/server/types.go index d0247fcb5ee7169ef9ac536d212cce8217707676..de98cf1a081580e66992b3dd65a5c5e6295d7a99 100644 --- a/gen/http/task/server/types.go +++ b/gen/http/task/server/types.go @@ -28,12 +28,14 @@ func NewCreateResponseBody(res *task.CreateResult) *CreateResponseBody { } // NewCreateRequest builds a task service Create endpoint payload. -func NewCreateRequest(body interface{}, taskName string) *task.CreateRequest { +func NewCreateRequest(body interface{}, taskName string, cacheNamespace *string, cacheScope *string) *task.CreateRequest { v := body res := &task.CreateRequest{ Data: v, } res.TaskName = taskName + res.CacheNamespace = cacheNamespace + res.CacheScope = cacheScope return res } diff --git a/gen/task/service.go b/gen/task/service.go index 6bd66d5b79088ae0d78f536af3b26e70d75d3685..13a5d28ac0ffbde98d80d9c898cdb789b16c2faa 100644 --- a/gen/task/service.go +++ b/gen/task/service.go @@ -33,6 +33,10 @@ type CreateRequest struct { TaskName string // Data contains JSON payload that will be used for task execution. Data interface{} + // Cache key namespace. + CacheNamespace *string + // Cache key scope. + CacheScope *string } // CreateResult is the result type of the task service Create method. diff --git a/internal/clients/cache/client.go b/internal/clients/cache/client.go index b4577a6b24f7c5dfc6a4fdd45db420abaf395657..dc9033bd88e9d35fa5a4663d213dac66a81a491e 100644 --- a/internal/clients/cache/client.go +++ b/internal/clients/cache/client.go @@ -1,6 +1,7 @@ package cache import ( + "bytes" "context" "fmt" "net/http" @@ -25,10 +26,31 @@ func New(addr string, opts ...Option) *Client { return c } -func (c *Client) Set(ctx context.Context, value []byte) error { - return fmt.Errorf("not implemented") +func (c *Client) Set(ctx context.Context, key, namespace, scope string, value []byte) error { + req, err := http.NewRequest("POST", c.addr+"/v1/cache", bytes.NewReader(value)) + if err != nil { + return err + } + + req.Header = http.Header{ + "x-cache-key": []string{key}, + "x-cache-namespace": []string{namespace}, + "x-cache-scope": []string{scope}, + } + + resp, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() // nolint:errcheck + + if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected response code: %d", resp.StatusCode) + } + + return nil } -func (c *Client) Get(ctx context.Context, key string) ([]byte, error) { +func (c *Client) Get(ctx context.Context, key, namespace, scope string) ([]byte, error) { return nil, fmt.Errorf("not implemented") } diff --git a/internal/config/config.go b/internal/config/config.go index a7a167e08afd10522e08222cb5d205563fe99be6..f68e90ba1d3c58f86e468b9b72b4e60e791d6d91 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -7,6 +7,7 @@ type Config struct { Mongo mongoConfig Policy policyConfig Executor executorConfig + Cache cacheConfig LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` } @@ -33,3 +34,7 @@ type executorConfig struct { Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"` PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` } + +type cacheConfig struct { + Addr string `envconfig:"CACHE_ADDR" required:"true"` +} diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 762df345f7b5c3f8fa3209266c55305f0d73665f..c59bc745e36a59664953e36bc75be0da457fc5e6 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -28,10 +28,16 @@ type Storage interface { SaveTaskHistory(ctx context.Context, task *task.Task) error } +type Cache interface { + Set(ctx context.Context, key, namespace, scope string, value []byte) error + Get(ctx context.Context, key, namespace, scope string) ([]byte, error) +} + type Executor struct { queue Queue policy Policy storage Storage + cache Cache workers int pollInterval time.Duration @@ -43,6 +49,7 @@ func New( queue Queue, policy Policy, storage Storage, + cache Cache, workers int, pollInterval time.Duration, httpClient *http.Client, @@ -52,6 +59,7 @@ func New( queue: queue, policy: policy, storage: storage, + cache: cache, workers: workers, pollInterval: pollInterval, httpClient: httpClient, @@ -68,7 +76,7 @@ func (e *Executor) Start(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - worker := newWorker(tasks, e.queue, e.policy, e.storage, e.httpClient, e.logger) + worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.httpClient, e.logger) worker.Start(ctx) }() } diff --git a/internal/executor/worker.go b/internal/executor/worker.go index d03bdc1162a5155431f1ca6e54fee81de49cae9c..dd36b3dabe5404128292e358a83b6e937b32cf75 100644 --- a/internal/executor/worker.go +++ b/internal/executor/worker.go @@ -18,16 +18,26 @@ type Worker struct { queue Queue policy Policy storage Storage + cache Cache httpClient *http.Client logger *zap.Logger } -func newWorker(tasks chan *taskpkg.Task, queue Queue, policy Policy, storage Storage, httpClient *http.Client, logger *zap.Logger) *Worker { +func newWorker( + tasks chan *taskpkg.Task, + queue Queue, + policy Policy, + storage Storage, + cache Cache, + httpClient *http.Client, + logger *zap.Logger, +) *Worker { return &Worker{ tasks: tasks, queue: queue, policy: policy, storage: storage, + cache: cache, httpClient: httpClient, logger: logger, } @@ -55,6 +65,20 @@ func (w *Worker) Start(ctx context.Context) { continue } + if err := w.cache.Set( + ctx, + executed.ID, + executed.CacheNamespace, + executed.CacheScope, + executed.Response, + ); err != nil { + logger.Error("error storing task result in cache", zap.Error(err)) + if err := w.queue.Unack(ctx, t); err != nil { + logger.Error("failed to unack task in queue", zap.Error(err)) + } + continue + } + if err := w.storage.SaveTaskHistory(ctx, executed); err != nil { logger.Error("error saving task history", zap.Error(err)) continue @@ -102,14 +126,16 @@ func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task // evaluate finalizer policy if task.FinalPolicy != "" { - if _, err := w.policy.Evaluate(ctx, task.FinalPolicy, task.Response); err != nil { + resp, err := w.policy.Evaluate(ctx, task.FinalPolicy, task.Response) + if err != nil { return nil, errors.New("error evaluating final policy", err) } + // overwrite task response with the one returned from the policy + task.Response = resp } task.FinishedAt = time.Now() return task, nil - } func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) { diff --git a/internal/service/task/service.go b/internal/service/task/service.go index 47a095cbb0bb79314769877587061a25c19c2853..78b713dc31d6accfbeda69bd74f5cf406e2c82ab 100644 --- a/internal/service/task/service.go +++ b/internal/service/task/service.go @@ -12,10 +12,12 @@ import ( goatask "code.vereign.com/gaiax/tsa/task/gen/task" ) +// Storage for retrieving predefined task templates. type Storage interface { TaskTemplate(ctx context.Context, taskName string) (*Task, error) } +// Queue interface for retrieving, returning and removing tasks from Queue. type Queue interface { Add(ctx context.Context, task *Task) error Poll(ctx context.Context) (*Task, error) @@ -29,6 +31,7 @@ type Service struct { logger *zap.Logger } +// New creates the task service. func New(template Storage, queue Queue, logger *zap.Logger) *Service { return &Service{ storage: template, @@ -37,7 +40,7 @@ func New(template Storage, queue Queue, logger *zap.Logger) *Service { } } -// Create a new task. +// Create a new task and put it in a Queue for later execution. func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *goatask.CreateResult, err error) { logger := s.logger.With(zap.String("taskName", req.TaskName)) @@ -45,7 +48,7 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res * task, err := s.storage.TaskTemplate(ctx, req.TaskName) if err != nil { logger.Error("error getting task template from storage", zap.Error(err)) - return nil, errors.New("error executing task", err) + return nil, err } taskRequest, err := json.Marshal(req.Data) @@ -59,6 +62,14 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res * task.CreatedAt = time.Now() task.Request = taskRequest + // if cache key namespace and scope are given, use them instead of the defaults + if req.CacheNamespace != nil && *req.CacheNamespace != "" { + task.CacheNamespace = *req.CacheNamespace + } + if req.CacheScope != nil && *req.CacheScope != "" { + task.CacheScope = *req.CacheScope + } + if err := s.queue.Add(ctx, task); err != nil { logger.Error("error adding task to queue", zap.Error(err)) return nil, errors.New("failed to create task", err) diff --git a/internal/service/task/task.go b/internal/service/task/task.go index 78a25a4d8e6b10182d71fb0c9e9832e224a1b3bf..962a887a2dd4b954d7207216fc9612e0c494d486 100644 --- a/internal/service/task/task.go +++ b/internal/service/task/task.go @@ -1,6 +1,7 @@ package task import ( + "strings" "time" ) @@ -28,11 +29,12 @@ type Task struct { Method string `json:"method"` // HTTP method of the task request. Request []byte `json:"request"` // Request body which will be sent in the task request. Response []byte `json:"response"` // Response received after the task request is executed. - ResponseCode int `json:"responseCode"` // Response code received after task request is executed. - RequestPolicy string `json:"requestPolicy"` // Request policy to be executed before task request execution. - ResponsePolicy string `json:"responsePolicy"` // Response policy to be executed before the response is accepted. - FinalPolicy string `json:"finalPolicy"` // TODO(penkovski): how is different from ResponsePolicy? - CacheKey string `json:"cacheKey"` // CacheKey used for storing the response in cache. + ResponseCode int `json:"responseCode"` // ResponseCode received after task request is executed. + RequestPolicy string `json:"requestPolicy"` // RequestPolicy to be executed before task request execution. + ResponsePolicy string `json:"responsePolicy"` // ResponsePolicy to be executed on the task response. + FinalPolicy string `json:"finalPolicy"` // FinalPolicy to be executed on the task response. + CacheNamespace string `json:"cacheNamespace"` // CacheNamespace if set, is used for constructing cache key. + CacheScope string `json:"cacheScope"` // CacheScope if set, is used for constructing cache key. CreatedAt time.Time `json:"createdAt"` // CreatedAt specifies task creation time. StartedAt time.Time `json:"startedAt"` // StartedAt specifies task execution start time. FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done. @@ -48,3 +50,17 @@ type Group struct { StartTime time.Time FinishTime time.Time } + +// CacheKey constructs the key for storing task result in the cache. +func (t *Task) CacheKey() string { + key := t.ID + namespace := strings.TrimSpace(t.CacheNamespace) + scope := strings.TrimSpace(t.CacheScope) + if namespace != "" { + key += "," + namespace + } + if scope != "" { + key += "," + scope + } + return key +}