Skip to content
Snippets Groups Projects
Commit 0d894d1d authored by Lyuben Penkovski's avatar Lyuben Penkovski
Browse files

Set task results in cache

parent d3ce155d
Branches
Tags
1 merge request!4Task execution of independent tasks
Pipeline #50638 passed
Showing
with 211 additions and 37 deletions
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
goatasksrv "code.vereign.com/gaiax/tsa/task/gen/http/task/server" goatasksrv "code.vereign.com/gaiax/tsa/task/gen/http/task/server"
"code.vereign.com/gaiax/tsa/task/gen/openapi" "code.vereign.com/gaiax/tsa/task/gen/openapi"
goatask "code.vereign.com/gaiax/tsa/task/gen/task" goatask "code.vereign.com/gaiax/tsa/task/gen/task"
"code.vereign.com/gaiax/tsa/task/internal/clients/cache"
"code.vereign.com/gaiax/tsa/task/internal/clients/policy" "code.vereign.com/gaiax/tsa/task/internal/clients/policy"
"code.vereign.com/gaiax/tsa/task/internal/config" "code.vereign.com/gaiax/tsa/task/internal/config"
"code.vereign.com/gaiax/tsa/task/internal/executor" "code.vereign.com/gaiax/tsa/task/internal/executor"
...@@ -68,11 +69,15 @@ func main() { ...@@ -68,11 +69,15 @@ func main() {
// create policy client // create policy client
policy := policy.New(cfg.Policy.Addr, httpClient()) policy := policy.New(cfg.Policy.Addr, httpClient())
// create cache client
cache := cache.New(cfg.Cache.Addr)
// create task executor // create task executor
executor := executor.New( executor := executor.New(
storage, storage,
policy, policy,
storage, storage,
cache,
cfg.Executor.Workers, cfg.Executor.Workers,
cfg.Executor.PollInterval, cfg.Executor.PollInterval,
httpClient(), httpClient(),
......
...@@ -24,7 +24,16 @@ var _ = Service("task", func() { ...@@ -24,7 +24,16 @@ var _ = Service("task", func() {
Result(CreateResult) Result(CreateResult)
HTTP(func() { HTTP(func() {
POST("/v1/task/{taskName}") POST("/v1/task/{taskName}")
Header("cacheNamespace:x-cache-namespace", String, "Cache key namespace", func() {
Example("login")
})
Header("cacheScope:x-cache-scope", String, "Cache key scope", func() {
Example("user")
})
Body("data") Body("data")
Response(StatusOK) Response(StatusOK)
}) })
}) })
......
...@@ -6,6 +6,8 @@ import . "goa.design/goa/v3/dsl" ...@@ -6,6 +6,8 @@ import . "goa.design/goa/v3/dsl"
var CreateRequest = Type("CreateRequest", func() { var CreateRequest = Type("CreateRequest", func() {
Field(1, "taskName", String, "Task name.") Field(1, "taskName", String, "Task name.")
Field(2, "data", Any, "Data contains JSON payload that will be used for task execution.") Field(2, "data", Any, "Data contains JSON payload that will be used for task execution.")
Field(3, "cacheNamespace", String, "Cache key namespace.")
Field(4, "cacheScope", String, "Cache key scope.")
Required("taskName", "data") Required("taskName", "data")
}) })
......
...@@ -32,7 +32,7 @@ task create ...@@ -32,7 +32,7 @@ task create
// UsageExamples produces an example of a valid invocation of the CLI tool. // UsageExamples produces an example of a valid invocation of the CLI tool.
func UsageExamples() string { func UsageExamples() string {
return os.Args[0] + ` health liveness` + "\n" + return os.Args[0] + ` health liveness` + "\n" +
os.Args[0] + ` task create --body "Minus reprehenderit non quo nihil adipisci." --task-name "Omnis illum rerum."` + "\n" + os.Args[0] + ` task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae."` + "\n" +
"" ""
} }
...@@ -54,9 +54,11 @@ func ParseEndpoint( ...@@ -54,9 +54,11 @@ func ParseEndpoint(
taskFlags = flag.NewFlagSet("task", flag.ContinueOnError) taskFlags = flag.NewFlagSet("task", flag.ContinueOnError)
taskCreateFlags = flag.NewFlagSet("create", flag.ExitOnError) taskCreateFlags = flag.NewFlagSet("create", flag.ExitOnError)
taskCreateBodyFlag = taskCreateFlags.String("body", "REQUIRED", "") taskCreateBodyFlag = taskCreateFlags.String("body", "REQUIRED", "")
taskCreateTaskNameFlag = taskCreateFlags.String("task-name", "REQUIRED", "Task name.") taskCreateTaskNameFlag = taskCreateFlags.String("task-name", "REQUIRED", "Task name.")
taskCreateCacheNamespaceFlag = taskCreateFlags.String("cache-namespace", "", "")
taskCreateCacheScopeFlag = taskCreateFlags.String("cache-scope", "", "")
) )
healthFlags.Usage = healthUsage healthFlags.Usage = healthUsage
healthLivenessFlags.Usage = healthLivenessUsage healthLivenessFlags.Usage = healthLivenessUsage
...@@ -151,7 +153,7 @@ func ParseEndpoint( ...@@ -151,7 +153,7 @@ func ParseEndpoint(
switch epn { switch epn {
case "create": case "create":
endpoint = c.Create() endpoint = c.Create()
data, err = taskc.BuildCreatePayload(*taskCreateBodyFlag, *taskCreateTaskNameFlag) data, err = taskc.BuildCreatePayload(*taskCreateBodyFlag, *taskCreateTaskNameFlag, *taskCreateCacheNamespaceFlag, *taskCreateCacheScopeFlag)
} }
} }
} }
...@@ -210,13 +212,15 @@ Additional help: ...@@ -210,13 +212,15 @@ Additional help:
`, os.Args[0]) `, os.Args[0])
} }
func taskCreateUsage() { func taskCreateUsage() {
fmt.Fprintf(os.Stderr, `%[1]s [flags] task create -body JSON -task-name STRING fmt.Fprintf(os.Stderr, `%[1]s [flags] task create -body JSON -task-name STRING -cache-namespace STRING -cache-scope STRING
Create a task and put it in a queue for execution. Create a task and put it in a queue for execution.
-body JSON: -body JSON:
-task-name STRING: Task name. -task-name STRING: Task name.
-cache-namespace STRING:
-cache-scope STRING:
Example: Example:
%[1]s task create --body "Minus reprehenderit non quo nihil adipisci." --task-name "Omnis illum rerum." %[1]s task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae."
`, os.Args[0]) `, os.Args[0])
} }
{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Corrupti facere sequi tempora eius assumenda molestiae."}},"example":{"taskID":"Laborum sapiente."},"required":["taskID"]}}} {"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","required":false,"type":"string"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","required":false,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Eaque excepturi suscipit veritatis nemo."}},"example":{"taskID":"Corrupti quia autem dolorum sunt aperiam quaerat."},"required":["taskID"]}}}
\ No newline at end of file \ No newline at end of file
...@@ -48,6 +48,16 @@ paths: ...@@ -48,6 +48,16 @@ paths:
description: Task name. description: Task name.
required: true required: true
type: string type: string
- name: x-cache-namespace
in: header
description: Cache key namespace
required: false
type: string
- name: x-cache-scope
in: header
description: Cache key scope
required: false
type: string
- name: any - name: any
in: body in: body
description: Data contains JSON payload that will be used for task execution. description: Data contains JSON payload that will be used for task execution.
...@@ -72,8 +82,8 @@ definitions: ...@@ -72,8 +82,8 @@ definitions:
taskID: taskID:
type: string type: string
description: Unique task identifier. description: Unique task identifier.
example: Corrupti facere sequi tempora eius assumenda molestiae. example: Eaque excepturi suscipit veritatis nemo.
example: example:
taskID: Laborum sapiente. taskID: Corrupti quia autem dolorum sunt aperiam quaerat.
required: required:
- taskID - taskID
{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Eveniet et eligendi sint quibusdam quia maxime."},"example":"Et ipsa voluptate."}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Nam et.","format":"binary"},"example":"Corrupti quia autem dolorum sunt aperiam quaerat."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Pariatur sequi et."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Facere quibusdam voluptate beatae."}},"example":{"taskID":"Eaque excepturi suscipit veritatis nemo."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} {"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Vel odio et doloribus est quod laborum."},"example":"Harum aut autem aliquam dolorem non soluta."},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key namespace","example":"login"},"example":"login"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key scope","example":"user"},"example":"user"}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Eveniet et eligendi sint quibusdam quia maxime.","format":"binary"},"example":"Ipsam et est accusantium."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Facere quibusdam voluptate beatae."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Et ipsa voluptate."}},"example":{"taskID":"Quo qui fuga impedit eos fuga et."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]}
\ No newline at end of file \ No newline at end of file
...@@ -40,8 +40,26 @@ paths: ...@@ -40,8 +40,26 @@ paths:
schema: schema:
type: string type: string
description: Task name. description: Task name.
example: Eveniet et eligendi sint quibusdam quia maxime. example: Vel odio et doloribus est quod laborum.
example: Et ipsa voluptate. example: Harum aut autem aliquam dolorem non soluta.
- name: x-cache-namespace
in: header
description: Cache key namespace
allowEmptyValue: true
schema:
type: string
description: Cache key namespace
example: login
example: login
- name: x-cache-scope
in: header
description: Cache key scope
allowEmptyValue: true
schema:
type: string
description: Cache key scope
example: user
example: user
requestBody: requestBody:
description: Data contains JSON payload that will be used for task execution. description: Data contains JSON payload that will be used for task execution.
required: true required: true
...@@ -50,9 +68,9 @@ paths: ...@@ -50,9 +68,9 @@ paths:
schema: schema:
type: string type: string
description: Data contains JSON payload that will be used for task execution. description: Data contains JSON payload that will be used for task execution.
example: Nam et. example: Eveniet et eligendi sint quibusdam quia maxime.
format: binary format: binary
example: Corrupti quia autem dolorum sunt aperiam quaerat. example: Ipsam et est accusantium.
responses: responses:
"200": "200":
description: OK response. description: OK response.
...@@ -61,7 +79,7 @@ paths: ...@@ -61,7 +79,7 @@ paths:
schema: schema:
$ref: '#/components/schemas/CreateResult' $ref: '#/components/schemas/CreateResult'
example: example:
taskID: Pariatur sequi et. taskID: Facere quibusdam voluptate beatae.
components: components:
schemas: schemas:
CreateResult: CreateResult:
...@@ -70,9 +88,9 @@ components: ...@@ -70,9 +88,9 @@ components:
taskID: taskID:
type: string type: string
description: Unique task identifier. description: Unique task identifier.
example: Facere quibusdam voluptate beatae. example: Et ipsa voluptate.
example: example:
taskID: Eaque excepturi suscipit veritatis nemo. taskID: Quo qui fuga impedit eos fuga et.
required: required:
- taskID - taskID
tags: tags:
......
...@@ -16,24 +16,38 @@ import ( ...@@ -16,24 +16,38 @@ import (
// BuildCreatePayload builds the payload for the task Create endpoint from CLI // BuildCreatePayload builds the payload for the task Create endpoint from CLI
// flags. // flags.
func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string) (*task.CreateRequest, error) { func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string, taskCreateCacheNamespace string, taskCreateCacheScope string) (*task.CreateRequest, error) {
var err error var err error
var body interface{} var body interface{}
{ {
err = json.Unmarshal([]byte(taskCreateBody), &body) err = json.Unmarshal([]byte(taskCreateBody), &body)
if err != nil { if err != nil {
return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Minus reprehenderit non quo nihil adipisci.\"") return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Nam et.\"")
} }
} }
var taskName string var taskName string
{ {
taskName = taskCreateTaskName taskName = taskCreateTaskName
} }
var cacheNamespace *string
{
if taskCreateCacheNamespace != "" {
cacheNamespace = &taskCreateCacheNamespace
}
}
var cacheScope *string
{
if taskCreateCacheScope != "" {
cacheScope = &taskCreateCacheScope
}
}
v := body v := body
res := &task.CreateRequest{ res := &task.CreateRequest{
Data: v, Data: v,
} }
res.TaskName = taskName res.TaskName = taskName
res.CacheNamespace = cacheNamespace
res.CacheScope = cacheScope
return res, nil return res, nil
} }
...@@ -51,6 +51,14 @@ func EncodeCreateRequest(encoder func(*http.Request) goahttp.Encoder) func(*http ...@@ -51,6 +51,14 @@ func EncodeCreateRequest(encoder func(*http.Request) goahttp.Encoder) func(*http
if !ok { if !ok {
return goahttp.ErrInvalidType("task", "Create", "*task.CreateRequest", v) return goahttp.ErrInvalidType("task", "Create", "*task.CreateRequest", v)
} }
if p.CacheNamespace != nil {
head := *p.CacheNamespace
req.Header.Set("x-cache-namespace", head)
}
if p.CacheScope != nil {
head := *p.CacheScope
req.Header.Set("x-cache-scope", head)
}
body := p.Data body := p.Data
if err := encoder(req).Encode(&body); err != nil { if err := encoder(req).Encode(&body); err != nil {
return goahttp.ErrEncodingError("task", "Create", err) return goahttp.ErrEncodingError("task", "Create", err)
......
...@@ -46,12 +46,22 @@ func DecodeCreateRequest(mux goahttp.Muxer, decoder func(*http.Request) goahttp. ...@@ -46,12 +46,22 @@ func DecodeCreateRequest(mux goahttp.Muxer, decoder func(*http.Request) goahttp.
} }
var ( var (
taskName string taskName string
cacheNamespace *string
cacheScope *string
params = mux.Vars(r) params = mux.Vars(r)
) )
taskName = params["taskName"] taskName = params["taskName"]
payload := NewCreateRequest(body, taskName) cacheNamespaceRaw := r.Header.Get("x-cache-namespace")
if cacheNamespaceRaw != "" {
cacheNamespace = &cacheNamespaceRaw
}
cacheScopeRaw := r.Header.Get("x-cache-scope")
if cacheScopeRaw != "" {
cacheScope = &cacheScopeRaw
}
payload := NewCreateRequest(body, taskName, cacheNamespace, cacheScope)
return payload, nil return payload, nil
} }
......
...@@ -28,12 +28,14 @@ func NewCreateResponseBody(res *task.CreateResult) *CreateResponseBody { ...@@ -28,12 +28,14 @@ func NewCreateResponseBody(res *task.CreateResult) *CreateResponseBody {
} }
// NewCreateRequest builds a task service Create endpoint payload. // NewCreateRequest builds a task service Create endpoint payload.
func NewCreateRequest(body interface{}, taskName string) *task.CreateRequest { func NewCreateRequest(body interface{}, taskName string, cacheNamespace *string, cacheScope *string) *task.CreateRequest {
v := body v := body
res := &task.CreateRequest{ res := &task.CreateRequest{
Data: v, Data: v,
} }
res.TaskName = taskName res.TaskName = taskName
res.CacheNamespace = cacheNamespace
res.CacheScope = cacheScope
return res return res
} }
...@@ -33,6 +33,10 @@ type CreateRequest struct { ...@@ -33,6 +33,10 @@ type CreateRequest struct {
TaskName string TaskName string
// Data contains JSON payload that will be used for task execution. // Data contains JSON payload that will be used for task execution.
Data interface{} Data interface{}
// Cache key namespace.
CacheNamespace *string
// Cache key scope.
CacheScope *string
} }
// CreateResult is the result type of the task service Create method. // CreateResult is the result type of the task service Create method.
......
package cache package cache
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
...@@ -25,10 +26,31 @@ func New(addr string, opts ...Option) *Client { ...@@ -25,10 +26,31 @@ func New(addr string, opts ...Option) *Client {
return c return c
} }
func (c *Client) Set(ctx context.Context, value []byte) error { func (c *Client) Set(ctx context.Context, key, namespace, scope string, value []byte) error {
return fmt.Errorf("not implemented") req, err := http.NewRequest("POST", c.addr+"/v1/cache", bytes.NewReader(value))
if err != nil {
return err
}
req.Header = http.Header{
"x-cache-key": []string{key},
"x-cache-namespace": []string{namespace},
"x-cache-scope": []string{scope},
}
resp, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer resp.Body.Close() // nolint:errcheck
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected response code: %d", resp.StatusCode)
}
return nil
} }
func (c *Client) Get(ctx context.Context, key string) ([]byte, error) { func (c *Client) Get(ctx context.Context, key, namespace, scope string) ([]byte, error) {
return nil, fmt.Errorf("not implemented") return nil, fmt.Errorf("not implemented")
} }
...@@ -7,6 +7,7 @@ type Config struct { ...@@ -7,6 +7,7 @@ type Config struct {
Mongo mongoConfig Mongo mongoConfig
Policy policyConfig Policy policyConfig
Executor executorConfig Executor executorConfig
Cache cacheConfig
LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"`
} }
...@@ -33,3 +34,7 @@ type executorConfig struct { ...@@ -33,3 +34,7 @@ type executorConfig struct {
Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"` Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"`
PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"`
} }
type cacheConfig struct {
Addr string `envconfig:"CACHE_ADDR" required:"true"`
}
...@@ -28,10 +28,16 @@ type Storage interface { ...@@ -28,10 +28,16 @@ type Storage interface {
SaveTaskHistory(ctx context.Context, task *task.Task) error SaveTaskHistory(ctx context.Context, task *task.Task) error
} }
type Cache interface {
Set(ctx context.Context, key, namespace, scope string, value []byte) error
Get(ctx context.Context, key, namespace, scope string) ([]byte, error)
}
type Executor struct { type Executor struct {
queue Queue queue Queue
policy Policy policy Policy
storage Storage storage Storage
cache Cache
workers int workers int
pollInterval time.Duration pollInterval time.Duration
...@@ -43,6 +49,7 @@ func New( ...@@ -43,6 +49,7 @@ func New(
queue Queue, queue Queue,
policy Policy, policy Policy,
storage Storage, storage Storage,
cache Cache,
workers int, workers int,
pollInterval time.Duration, pollInterval time.Duration,
httpClient *http.Client, httpClient *http.Client,
...@@ -52,6 +59,7 @@ func New( ...@@ -52,6 +59,7 @@ func New(
queue: queue, queue: queue,
policy: policy, policy: policy,
storage: storage, storage: storage,
cache: cache,
workers: workers, workers: workers,
pollInterval: pollInterval, pollInterval: pollInterval,
httpClient: httpClient, httpClient: httpClient,
...@@ -68,7 +76,7 @@ func (e *Executor) Start(ctx context.Context) error { ...@@ -68,7 +76,7 @@ func (e *Executor) Start(ctx context.Context) error {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
worker := newWorker(tasks, e.queue, e.policy, e.storage, e.httpClient, e.logger) worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.httpClient, e.logger)
worker.Start(ctx) worker.Start(ctx)
}() }()
} }
......
...@@ -18,16 +18,26 @@ type Worker struct { ...@@ -18,16 +18,26 @@ type Worker struct {
queue Queue queue Queue
policy Policy policy Policy
storage Storage storage Storage
cache Cache
httpClient *http.Client httpClient *http.Client
logger *zap.Logger logger *zap.Logger
} }
func newWorker(tasks chan *taskpkg.Task, queue Queue, policy Policy, storage Storage, httpClient *http.Client, logger *zap.Logger) *Worker { func newWorker(
tasks chan *taskpkg.Task,
queue Queue,
policy Policy,
storage Storage,
cache Cache,
httpClient *http.Client,
logger *zap.Logger,
) *Worker {
return &Worker{ return &Worker{
tasks: tasks, tasks: tasks,
queue: queue, queue: queue,
policy: policy, policy: policy,
storage: storage, storage: storage,
cache: cache,
httpClient: httpClient, httpClient: httpClient,
logger: logger, logger: logger,
} }
...@@ -55,6 +65,20 @@ func (w *Worker) Start(ctx context.Context) { ...@@ -55,6 +65,20 @@ func (w *Worker) Start(ctx context.Context) {
continue continue
} }
if err := w.cache.Set(
ctx,
executed.ID,
executed.CacheNamespace,
executed.CacheScope,
executed.Response,
); err != nil {
logger.Error("error storing task result in cache", zap.Error(err))
if err := w.queue.Unack(ctx, t); err != nil {
logger.Error("failed to unack task in queue", zap.Error(err))
}
continue
}
if err := w.storage.SaveTaskHistory(ctx, executed); err != nil { if err := w.storage.SaveTaskHistory(ctx, executed); err != nil {
logger.Error("error saving task history", zap.Error(err)) logger.Error("error saving task history", zap.Error(err))
continue continue
...@@ -102,14 +126,16 @@ func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task ...@@ -102,14 +126,16 @@ func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task
// evaluate finalizer policy // evaluate finalizer policy
if task.FinalPolicy != "" { if task.FinalPolicy != "" {
if _, err := w.policy.Evaluate(ctx, task.FinalPolicy, task.Response); err != nil { resp, err := w.policy.Evaluate(ctx, task.FinalPolicy, task.Response)
if err != nil {
return nil, errors.New("error evaluating final policy", err) return nil, errors.New("error evaluating final policy", err)
} }
// overwrite task response with the one returned from the policy
task.Response = resp
} }
task.FinishedAt = time.Now() task.FinishedAt = time.Now()
return task, nil return task, nil
} }
func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) { func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) {
......
...@@ -12,10 +12,12 @@ import ( ...@@ -12,10 +12,12 @@ import (
goatask "code.vereign.com/gaiax/tsa/task/gen/task" goatask "code.vereign.com/gaiax/tsa/task/gen/task"
) )
// Storage for retrieving predefined task templates.
type Storage interface { type Storage interface {
TaskTemplate(ctx context.Context, taskName string) (*Task, error) TaskTemplate(ctx context.Context, taskName string) (*Task, error)
} }
// Queue interface for retrieving, returning and removing tasks from Queue.
type Queue interface { type Queue interface {
Add(ctx context.Context, task *Task) error Add(ctx context.Context, task *Task) error
Poll(ctx context.Context) (*Task, error) Poll(ctx context.Context) (*Task, error)
...@@ -29,6 +31,7 @@ type Service struct { ...@@ -29,6 +31,7 @@ type Service struct {
logger *zap.Logger logger *zap.Logger
} }
// New creates the task service.
func New(template Storage, queue Queue, logger *zap.Logger) *Service { func New(template Storage, queue Queue, logger *zap.Logger) *Service {
return &Service{ return &Service{
storage: template, storage: template,
...@@ -37,7 +40,7 @@ func New(template Storage, queue Queue, logger *zap.Logger) *Service { ...@@ -37,7 +40,7 @@ func New(template Storage, queue Queue, logger *zap.Logger) *Service {
} }
} }
// Create a new task. // Create a new task and put it in a Queue for later execution.
func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *goatask.CreateResult, err error) { func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *goatask.CreateResult, err error) {
logger := s.logger.With(zap.String("taskName", req.TaskName)) logger := s.logger.With(zap.String("taskName", req.TaskName))
...@@ -45,7 +48,7 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res * ...@@ -45,7 +48,7 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *
task, err := s.storage.TaskTemplate(ctx, req.TaskName) task, err := s.storage.TaskTemplate(ctx, req.TaskName)
if err != nil { if err != nil {
logger.Error("error getting task template from storage", zap.Error(err)) logger.Error("error getting task template from storage", zap.Error(err))
return nil, errors.New("error executing task", err) return nil, err
} }
taskRequest, err := json.Marshal(req.Data) taskRequest, err := json.Marshal(req.Data)
...@@ -59,6 +62,14 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res * ...@@ -59,6 +62,14 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *
task.CreatedAt = time.Now() task.CreatedAt = time.Now()
task.Request = taskRequest task.Request = taskRequest
// if cache key namespace and scope are given, use them instead of the defaults
if req.CacheNamespace != nil && *req.CacheNamespace != "" {
task.CacheNamespace = *req.CacheNamespace
}
if req.CacheScope != nil && *req.CacheScope != "" {
task.CacheScope = *req.CacheScope
}
if err := s.queue.Add(ctx, task); err != nil { if err := s.queue.Add(ctx, task); err != nil {
logger.Error("error adding task to queue", zap.Error(err)) logger.Error("error adding task to queue", zap.Error(err))
return nil, errors.New("failed to create task", err) return nil, errors.New("failed to create task", err)
......
package task package task
import ( import (
"strings"
"time" "time"
) )
...@@ -28,11 +29,12 @@ type Task struct { ...@@ -28,11 +29,12 @@ type Task struct {
Method string `json:"method"` // HTTP method of the task request. Method string `json:"method"` // HTTP method of the task request.
Request []byte `json:"request"` // Request body which will be sent in the task request. Request []byte `json:"request"` // Request body which will be sent in the task request.
Response []byte `json:"response"` // Response received after the task request is executed. Response []byte `json:"response"` // Response received after the task request is executed.
ResponseCode int `json:"responseCode"` // Response code received after task request is executed. ResponseCode int `json:"responseCode"` // ResponseCode received after task request is executed.
RequestPolicy string `json:"requestPolicy"` // Request policy to be executed before task request execution. RequestPolicy string `json:"requestPolicy"` // RequestPolicy to be executed before task request execution.
ResponsePolicy string `json:"responsePolicy"` // Response policy to be executed before the response is accepted. ResponsePolicy string `json:"responsePolicy"` // ResponsePolicy to be executed on the task response.
FinalPolicy string `json:"finalPolicy"` // TODO(penkovski): how is different from ResponsePolicy? FinalPolicy string `json:"finalPolicy"` // FinalPolicy to be executed on the task response.
CacheKey string `json:"cacheKey"` // CacheKey used for storing the response in cache. CacheNamespace string `json:"cacheNamespace"` // CacheNamespace if set, is used for constructing cache key.
CacheScope string `json:"cacheScope"` // CacheScope if set, is used for constructing cache key.
CreatedAt time.Time `json:"createdAt"` // CreatedAt specifies task creation time. CreatedAt time.Time `json:"createdAt"` // CreatedAt specifies task creation time.
StartedAt time.Time `json:"startedAt"` // StartedAt specifies task execution start time. StartedAt time.Time `json:"startedAt"` // StartedAt specifies task execution start time.
FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done. FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done.
...@@ -48,3 +50,17 @@ type Group struct { ...@@ -48,3 +50,17 @@ type Group struct {
StartTime time.Time StartTime time.Time
FinishTime time.Time FinishTime time.Time
} }
// CacheKey constructs the key for storing task result in the cache.
func (t *Task) CacheKey() string {
key := t.ID
namespace := strings.TrimSpace(t.CacheNamespace)
scope := strings.TrimSpace(t.CacheScope)
if namespace != "" {
key += "," + namespace
}
if scope != "" {
key += "," + scope
}
return key
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment