diff --git a/cmd/task/main.go b/cmd/task/main.go index 0e879c569aaa2eaa205d8c4176547c24071a670f..a349fd17c7b7cb703187fde31e9cb172592a528a 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log" + "net" "net/http" "time" @@ -23,7 +24,10 @@ import ( goatasksrv "code.vereign.com/gaiax/tsa/task/gen/http/task/server" "code.vereign.com/gaiax/tsa/task/gen/openapi" goatask "code.vereign.com/gaiax/tsa/task/gen/task" + "code.vereign.com/gaiax/tsa/task/internal/clients/cache" + "code.vereign.com/gaiax/tsa/task/internal/clients/policy" "code.vereign.com/gaiax/tsa/task/internal/config" + "code.vereign.com/gaiax/tsa/task/internal/executor" "code.vereign.com/gaiax/tsa/task/internal/service" "code.vereign.com/gaiax/tsa/task/internal/service/health" "code.vereign.com/gaiax/tsa/task/internal/service/task" @@ -62,6 +66,24 @@ func main() { // create storage storage := storage.New(db) + // create policy client + policy := policy.New(cfg.Policy.Addr, httpClient()) + + // create cache client + cache := cache.New(cfg.Cache.Addr) + + // create task executor + executor := executor.New( + storage, + policy, + storage, + cache, + cfg.Executor.Workers, + cfg.Executor.PollInterval, + httpClient(), + logger, + ) + // create services var ( taskSvc goatask.Service @@ -134,6 +156,9 @@ func main() { } return errors.New("server stopped successfully") }) + g.Go(func() error { + return executor.Start(ctx) + }) if err := g.Wait(); err != nil { logger.Error("run group stopped", zap.Error(err)) } @@ -161,3 +186,19 @@ func createLogger(logLevel string, opts ...zap.Option) (*zap.Logger, error) { func errFormatter(e error) goahttp.Statuser { return service.NewErrorResponse(e) } + +func httpClient() *http.Client { + return &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + }).DialContext, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: 60 * time.Second, + }, + Timeout: 30 * time.Second, + } +} diff --git a/design/design.go b/design/design.go index a733290c3bcb1c13b9af29ab2480c5d8773c3f03..0e21586fbef9ea8846c3c78c87d649076b2398c8 100644 --- a/design/design.go +++ b/design/design.go @@ -24,7 +24,16 @@ var _ = Service("task", func() { Result(CreateResult) HTTP(func() { POST("/v1/task/{taskName}") + + Header("cacheNamespace:x-cache-namespace", String, "Cache key namespace", func() { + Example("login") + }) + Header("cacheScope:x-cache-scope", String, "Cache key scope", func() { + Example("user") + }) + Body("data") + Response(StatusOK) }) }) diff --git a/design/types.go b/design/types.go index 46eb0674fd655eab128a305629756b16ad603f63..ed8662a42b4811224176ed58405dafc801b6f082 100644 --- a/design/types.go +++ b/design/types.go @@ -6,6 +6,8 @@ import . "goa.design/goa/v3/dsl" var CreateRequest = Type("CreateRequest", func() { Field(1, "taskName", String, "Task name.") Field(2, "data", Any, "Data contains JSON payload that will be used for task execution.") + Field(3, "cacheNamespace", String, "Cache key namespace.") + Field(4, "cacheScope", String, "Cache key scope.") Required("taskName", "data") }) diff --git a/gen/http/cli/task/cli.go b/gen/http/cli/task/cli.go index 248d9e39b29af21128fa47807c1d4eddba78f800..c859f0e4edee502712eaaba346baa5782937eddc 100644 --- a/gen/http/cli/task/cli.go +++ b/gen/http/cli/task/cli.go @@ -32,7 +32,7 @@ task create // UsageExamples produces an example of a valid invocation of the CLI tool. func UsageExamples() string { return os.Args[0] + ` health liveness` + "\n" + - os.Args[0] + ` task create --body "Minus reprehenderit non quo nihil adipisci." --task-name "Omnis illum rerum."` + "\n" + + os.Args[0] + ` task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae."` + "\n" + "" } @@ -54,9 +54,11 @@ func ParseEndpoint( taskFlags = flag.NewFlagSet("task", flag.ContinueOnError) - taskCreateFlags = flag.NewFlagSet("create", flag.ExitOnError) - taskCreateBodyFlag = taskCreateFlags.String("body", "REQUIRED", "") - taskCreateTaskNameFlag = taskCreateFlags.String("task-name", "REQUIRED", "Task name.") + taskCreateFlags = flag.NewFlagSet("create", flag.ExitOnError) + taskCreateBodyFlag = taskCreateFlags.String("body", "REQUIRED", "") + taskCreateTaskNameFlag = taskCreateFlags.String("task-name", "REQUIRED", "Task name.") + taskCreateCacheNamespaceFlag = taskCreateFlags.String("cache-namespace", "", "") + taskCreateCacheScopeFlag = taskCreateFlags.String("cache-scope", "", "") ) healthFlags.Usage = healthUsage healthLivenessFlags.Usage = healthLivenessUsage @@ -151,7 +153,7 @@ func ParseEndpoint( switch epn { case "create": endpoint = c.Create() - data, err = taskc.BuildCreatePayload(*taskCreateBodyFlag, *taskCreateTaskNameFlag) + data, err = taskc.BuildCreatePayload(*taskCreateBodyFlag, *taskCreateTaskNameFlag, *taskCreateCacheNamespaceFlag, *taskCreateCacheScopeFlag) } } } @@ -210,13 +212,15 @@ Additional help: `, os.Args[0]) } func taskCreateUsage() { - fmt.Fprintf(os.Stderr, `%[1]s [flags] task create -body JSON -task-name STRING + fmt.Fprintf(os.Stderr, `%[1]s [flags] task create -body JSON -task-name STRING -cache-namespace STRING -cache-scope STRING Create a task and put it in a queue for execution. -body JSON: -task-name STRING: Task name. + -cache-namespace STRING: + -cache-scope STRING: Example: - %[1]s task create --body "Minus reprehenderit non quo nihil adipisci." --task-name "Omnis illum rerum." + %[1]s task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae." `, os.Args[0]) } diff --git a/gen/http/openapi.json b/gen/http/openapi.json index 49774f5de7bd482690963e51d82ed70d59336815..cbf1687e983ec137eb1e5d86c0eefec31be36b3d 100644 --- a/gen/http/openapi.json +++ b/gen/http/openapi.json @@ -1 +1 @@ -{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Corrupti facere sequi tempora eius assumenda molestiae."}},"example":{"taskID":"Laborum sapiente."},"required":["taskID"]}}} \ No newline at end of file +{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","required":false,"type":"string"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","required":false,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Eaque excepturi suscipit veritatis nemo."}},"example":{"taskID":"Corrupti quia autem dolorum sunt aperiam quaerat."},"required":["taskID"]}}} \ No newline at end of file diff --git a/gen/http/openapi.yaml b/gen/http/openapi.yaml index 61bbc9cae68f700f4c9cede40c777ce525718f22..ca530f83c341213f3f961f435c8ecf88d4d75f6e 100644 --- a/gen/http/openapi.yaml +++ b/gen/http/openapi.yaml @@ -48,6 +48,16 @@ paths: description: Task name. required: true type: string + - name: x-cache-namespace + in: header + description: Cache key namespace + required: false + type: string + - name: x-cache-scope + in: header + description: Cache key scope + required: false + type: string - name: any in: body description: Data contains JSON payload that will be used for task execution. @@ -72,8 +82,8 @@ definitions: taskID: type: string description: Unique task identifier. - example: Corrupti facere sequi tempora eius assumenda molestiae. + example: Eaque excepturi suscipit veritatis nemo. example: - taskID: Laborum sapiente. + taskID: Corrupti quia autem dolorum sunt aperiam quaerat. required: - taskID diff --git a/gen/http/openapi3.json b/gen/http/openapi3.json index d579254c652ff77ebd83f7ed6833017cbb673e76..deb72810cb4565839f30f5ddb709475fc8d2bebf 100644 --- a/gen/http/openapi3.json +++ b/gen/http/openapi3.json @@ -1 +1 @@ -{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Eveniet et eligendi sint quibusdam quia maxime."},"example":"Et ipsa voluptate."}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Nam et.","format":"binary"},"example":"Corrupti quia autem dolorum sunt aperiam quaerat."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Pariatur sequi et."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Facere quibusdam voluptate beatae."}},"example":{"taskID":"Eaque excepturi suscipit veritatis nemo."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} \ No newline at end of file +{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Vel odio et doloribus est quod laborum."},"example":"Harum aut autem aliquam dolorem non soluta."},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key namespace","example":"login"},"example":"login"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key scope","example":"user"},"example":"user"}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Eveniet et eligendi sint quibusdam quia maxime.","format":"binary"},"example":"Ipsam et est accusantium."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Facere quibusdam voluptate beatae."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Et ipsa voluptate."}},"example":{"taskID":"Quo qui fuga impedit eos fuga et."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} \ No newline at end of file diff --git a/gen/http/openapi3.yaml b/gen/http/openapi3.yaml index 277245235bd2acb3a2e9c016dce06063e30b1afd..8e14b7c0de5748cbd7901c7eea9745a8feb357ca 100644 --- a/gen/http/openapi3.yaml +++ b/gen/http/openapi3.yaml @@ -40,8 +40,26 @@ paths: schema: type: string description: Task name. - example: Eveniet et eligendi sint quibusdam quia maxime. - example: Et ipsa voluptate. + example: Vel odio et doloribus est quod laborum. + example: Harum aut autem aliquam dolorem non soluta. + - name: x-cache-namespace + in: header + description: Cache key namespace + allowEmptyValue: true + schema: + type: string + description: Cache key namespace + example: login + example: login + - name: x-cache-scope + in: header + description: Cache key scope + allowEmptyValue: true + schema: + type: string + description: Cache key scope + example: user + example: user requestBody: description: Data contains JSON payload that will be used for task execution. required: true @@ -50,9 +68,9 @@ paths: schema: type: string description: Data contains JSON payload that will be used for task execution. - example: Nam et. + example: Eveniet et eligendi sint quibusdam quia maxime. format: binary - example: Corrupti quia autem dolorum sunt aperiam quaerat. + example: Ipsam et est accusantium. responses: "200": description: OK response. @@ -61,7 +79,7 @@ paths: schema: $ref: '#/components/schemas/CreateResult' example: - taskID: Pariatur sequi et. + taskID: Facere quibusdam voluptate beatae. components: schemas: CreateResult: @@ -70,9 +88,9 @@ components: taskID: type: string description: Unique task identifier. - example: Facere quibusdam voluptate beatae. + example: Et ipsa voluptate. example: - taskID: Eaque excepturi suscipit veritatis nemo. + taskID: Quo qui fuga impedit eos fuga et. required: - taskID tags: diff --git a/gen/http/task/client/cli.go b/gen/http/task/client/cli.go index 5ead7e96e6ebb3ceda6544b329fffe02c8a81a96..b6ff7fb25a49ddd34dfa22013d4c4d398536335f 100644 --- a/gen/http/task/client/cli.go +++ b/gen/http/task/client/cli.go @@ -16,24 +16,38 @@ import ( // BuildCreatePayload builds the payload for the task Create endpoint from CLI // flags. -func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string) (*task.CreateRequest, error) { +func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string, taskCreateCacheNamespace string, taskCreateCacheScope string) (*task.CreateRequest, error) { var err error var body interface{} { err = json.Unmarshal([]byte(taskCreateBody), &body) if err != nil { - return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Minus reprehenderit non quo nihil adipisci.\"") + return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Nam et.\"") } } var taskName string { taskName = taskCreateTaskName } + var cacheNamespace *string + { + if taskCreateCacheNamespace != "" { + cacheNamespace = &taskCreateCacheNamespace + } + } + var cacheScope *string + { + if taskCreateCacheScope != "" { + cacheScope = &taskCreateCacheScope + } + } v := body res := &task.CreateRequest{ Data: v, } res.TaskName = taskName + res.CacheNamespace = cacheNamespace + res.CacheScope = cacheScope return res, nil } diff --git a/gen/http/task/client/encode_decode.go b/gen/http/task/client/encode_decode.go index 5d4bfc720afba2998e07385bfbb0fdbd03324b67..52ac5bd62e0c8889272354c4336f550af8ea593f 100644 --- a/gen/http/task/client/encode_decode.go +++ b/gen/http/task/client/encode_decode.go @@ -51,6 +51,14 @@ func EncodeCreateRequest(encoder func(*http.Request) goahttp.Encoder) func(*http if !ok { return goahttp.ErrInvalidType("task", "Create", "*task.CreateRequest", v) } + if p.CacheNamespace != nil { + head := *p.CacheNamespace + req.Header.Set("x-cache-namespace", head) + } + if p.CacheScope != nil { + head := *p.CacheScope + req.Header.Set("x-cache-scope", head) + } body := p.Data if err := encoder(req).Encode(&body); err != nil { return goahttp.ErrEncodingError("task", "Create", err) diff --git a/gen/http/task/server/encode_decode.go b/gen/http/task/server/encode_decode.go index 4e6ddfac7883b65925fbbe505aec623f32ee0e17..f4549af31a070fd6f67faa7273753f1475c0929b 100644 --- a/gen/http/task/server/encode_decode.go +++ b/gen/http/task/server/encode_decode.go @@ -46,12 +46,22 @@ func DecodeCreateRequest(mux goahttp.Muxer, decoder func(*http.Request) goahttp. } var ( - taskName string + taskName string + cacheNamespace *string + cacheScope *string params = mux.Vars(r) ) taskName = params["taskName"] - payload := NewCreateRequest(body, taskName) + cacheNamespaceRaw := r.Header.Get("x-cache-namespace") + if cacheNamespaceRaw != "" { + cacheNamespace = &cacheNamespaceRaw + } + cacheScopeRaw := r.Header.Get("x-cache-scope") + if cacheScopeRaw != "" { + cacheScope = &cacheScopeRaw + } + payload := NewCreateRequest(body, taskName, cacheNamespace, cacheScope) return payload, nil } diff --git a/gen/http/task/server/types.go b/gen/http/task/server/types.go index d0247fcb5ee7169ef9ac536d212cce8217707676..de98cf1a081580e66992b3dd65a5c5e6295d7a99 100644 --- a/gen/http/task/server/types.go +++ b/gen/http/task/server/types.go @@ -28,12 +28,14 @@ func NewCreateResponseBody(res *task.CreateResult) *CreateResponseBody { } // NewCreateRequest builds a task service Create endpoint payload. -func NewCreateRequest(body interface{}, taskName string) *task.CreateRequest { +func NewCreateRequest(body interface{}, taskName string, cacheNamespace *string, cacheScope *string) *task.CreateRequest { v := body res := &task.CreateRequest{ Data: v, } res.TaskName = taskName + res.CacheNamespace = cacheNamespace + res.CacheScope = cacheScope return res } diff --git a/gen/task/service.go b/gen/task/service.go index 6bd66d5b79088ae0d78f536af3b26e70d75d3685..13a5d28ac0ffbde98d80d9c898cdb789b16c2faa 100644 --- a/gen/task/service.go +++ b/gen/task/service.go @@ -33,6 +33,10 @@ type CreateRequest struct { TaskName string // Data contains JSON payload that will be used for task execution. Data interface{} + // Cache key namespace. + CacheNamespace *string + // Cache key scope. + CacheScope *string } // CreateResult is the result type of the task service Create method. diff --git a/go.mod b/go.mod index 888464d14644a144b261d14643bf32f32850e22e..208c120b1a568cbaf999a06fea637b2987e83bd5 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.17 require ( code.vereign.com/gaiax/tsa/golib v0.0.0-20220321093827-5fdf8f34aad9 + github.com/cenkalti/backoff/v4 v4.1.3 github.com/google/uuid v1.3.0 github.com/kelseyhightower/envconfig v1.4.0 go.mongodb.org/mongo-driver v1.8.4 diff --git a/go.sum b/go.sum index 3e7b80662f07e6d2f70de3c29789d73f3876be1e..88799848bf7901420915b8d03065a61acaffeb29 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLj github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= +github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= diff --git a/internal/clients/cache/client.go b/internal/clients/cache/client.go new file mode 100644 index 0000000000000000000000000000000000000000..dc9033bd88e9d35fa5a4663d213dac66a81a491e --- /dev/null +++ b/internal/clients/cache/client.go @@ -0,0 +1,56 @@ +package cache + +import ( + "bytes" + "context" + "fmt" + "net/http" +) + +// Client for the Cache service. +type Client struct { + addr string + httpClient *http.Client +} + +func New(addr string, opts ...Option) *Client { + c := &Client{ + addr: addr, + httpClient: http.DefaultClient, + } + + for _, opt := range opts { + opt(c) + } + + return c +} + +func (c *Client) Set(ctx context.Context, key, namespace, scope string, value []byte) error { + req, err := http.NewRequest("POST", c.addr+"/v1/cache", bytes.NewReader(value)) + if err != nil { + return err + } + + req.Header = http.Header{ + "x-cache-key": []string{key}, + "x-cache-namespace": []string{namespace}, + "x-cache-scope": []string{scope}, + } + + resp, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() // nolint:errcheck + + if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected response code: %d", resp.StatusCode) + } + + return nil +} + +func (c *Client) Get(ctx context.Context, key, namespace, scope string) ([]byte, error) { + return nil, fmt.Errorf("not implemented") +} diff --git a/internal/clients/cache/option.go b/internal/clients/cache/option.go new file mode 100644 index 0000000000000000000000000000000000000000..10ef93337d96c9319a0d4dff4333024a581fa921 --- /dev/null +++ b/internal/clients/cache/option.go @@ -0,0 +1,13 @@ +package cache + +import ( + "net/http" +) + +type Option func(*Client) + +func WithHTTPClient(client *http.Client) Option { + return func(c *Client) { + c.httpClient = client + } +} diff --git a/internal/clients/policy/client.go b/internal/clients/policy/client.go new file mode 100644 index 0000000000000000000000000000000000000000..e708a73e9be04b667c31c8236a6cce6a2b4f841a --- /dev/null +++ b/internal/clients/policy/client.go @@ -0,0 +1,53 @@ +package policy + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/url" + + "code.vereign.com/gaiax/tsa/golib/errors" +) + +type Client struct { + addr string + httpClient *http.Client +} + +func New(addr string, httpClient *http.Client) *Client { + return &Client{ + addr: addr, + httpClient: httpClient, + } +} + +// Evaluate calls the policy service to execute the given policy. +// The policy is expected as a string path uniquely identifying the +// policy that has to be evaluated. For example, with policy = `gaiax/didResolve/1.0`, +// the client will do HTTP request to http://policyhost/policy/gaiax/didResolve/1.0/evaluation. +func (c *Client) Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) { + uri := c.addr + "/policy/" + policy + "/evaluation" + policyURL, err := url.ParseRequestURI(uri) + if err != nil { + return nil, errors.New(errors.BadRequest, "invalid policy evaluation URL", err) + } + + req, err := http.NewRequest("POST", policyURL.String(), bytes.NewReader(data)) + if err != nil { + return nil, err + } + + resp, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + defer resp.Body.Close() // nolint:errcheck + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected response on policy evaluation: %d", resp.StatusCode) + } + + return io.ReadAll(resp.Body) +} diff --git a/internal/config/config.go b/internal/config/config.go index 0d0b375d7984af01183f3a1cd30fa12116a63326..f68e90ba1d3c58f86e468b9b72b4e60e791d6d91 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,8 +3,11 @@ package config import "time" type Config struct { - HTTP httpConfig - Mongo mongoConfig + HTTP httpConfig + Mongo mongoConfig + Policy policyConfig + Executor executorConfig + Cache cacheConfig LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` } @@ -22,3 +25,16 @@ type mongoConfig struct { User string `envconfig:"MONGO_USER" required:"true"` Pass string `envconfig:"MONGO_PASS" required:"true"` } + +type policyConfig struct { + Addr string `envconfig:"POLICY_ADDR" required:"true"` +} + +type executorConfig struct { + Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"` + PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` +} + +type cacheConfig struct { + Addr string `envconfig:"CACHE_ADDR" required:"true"` +} diff --git a/internal/executor/executor.go b/internal/executor/executor.go new file mode 100644 index 0000000000000000000000000000000000000000..c59bc745e36a59664953e36bc75be0da457fc5e6 --- /dev/null +++ b/internal/executor/executor.go @@ -0,0 +1,104 @@ +package executor + +import ( + "context" + "net/http" + "sync" + "time" + + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +// Policy client. +type Policy interface { + Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) +} + +// Queue allows retrieving, returning and deleting tasks from storage. +type Queue interface { + Poll(ctx context.Context) (*task.Task, error) + Ack(ctx context.Context, task *task.Task) error + Unack(ctx context.Context, task *task.Task) error +} + +type Storage interface { + SaveTaskHistory(ctx context.Context, task *task.Task) error +} + +type Cache interface { + Set(ctx context.Context, key, namespace, scope string, value []byte) error + Get(ctx context.Context, key, namespace, scope string) ([]byte, error) +} + +type Executor struct { + queue Queue + policy Policy + storage Storage + cache Cache + workers int + pollInterval time.Duration + + httpClient *http.Client + logger *zap.Logger +} + +func New( + queue Queue, + policy Policy, + storage Storage, + cache Cache, + workers int, + pollInterval time.Duration, + httpClient *http.Client, + logger *zap.Logger, +) *Executor { + return &Executor{ + queue: queue, + policy: policy, + storage: storage, + cache: cache, + workers: workers, + pollInterval: pollInterval, + httpClient: httpClient, + logger: logger, + } +} + +func (e *Executor) Start(ctx context.Context) error { + defer e.logger.Info("task executor stopped") + + var wg sync.WaitGroup + tasks := make(chan *task.Task) + for i := 0; i < e.workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.httpClient, e.logger) + worker.Start(ctx) + }() + } + +loop: + for { + select { + case <-ctx.Done(): + break loop + case <-time.After(e.pollInterval): + t, err := e.queue.Poll(ctx) + if err != nil { + if !errors.Is(errors.NotFound, err) { + e.logger.Error("error getting task from queue", zap.Error(err)) + } + continue + } + tasks <- t // send task to the workers for execution + } + } + + wg.Wait() // wait all workers to stop + + return ctx.Err() +} diff --git a/internal/executor/worker.go b/internal/executor/worker.go new file mode 100644 index 0000000000000000000000000000000000000000..feb9d3f81eddbd82698bd4d14ca373a6572cc1e5 --- /dev/null +++ b/internal/executor/worker.go @@ -0,0 +1,166 @@ +package executor + +import ( + "bytes" + "context" + "io" + "net/http" + "time" + + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + taskpkg "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +type Worker struct { + tasks chan *taskpkg.Task + queue Queue + policy Policy + storage Storage + cache Cache + httpClient *http.Client + logger *zap.Logger +} + +func newWorker( + tasks chan *taskpkg.Task, + queue Queue, + policy Policy, + storage Storage, + cache Cache, + httpClient *http.Client, + logger *zap.Logger, +) *Worker { + return &Worker{ + tasks: tasks, + queue: queue, + policy: policy, + storage: storage, + cache: cache, + httpClient: httpClient, + logger: logger, + } +} + +func (w *Worker) Start(ctx context.Context) { + defer w.logger.Debug("task worker stopped") + + for { + select { + case <-ctx.Done(): + return + case t := <-w.tasks: + logger := w.logger.With( + zap.String("taskID", t.ID), + zap.String("taskName", t.Name), + ) + + executed, err := w.Execute(ctx, t) + if err != nil { + logger.Error("error executing task", zap.Error(err)) + if err := w.queue.Unack(ctx, t); err != nil { + logger.Error("failed to unack task in queue", zap.Error(err)) + } + continue + } + logger.Debug("task execution completed successfully") + + if err := w.cache.Set( + ctx, + executed.ID, + executed.CacheNamespace, + executed.CacheScope, + executed.Response, + ); err != nil { + logger.Error("error storing task result in cache", zap.Error(err)) + if err := w.queue.Unack(ctx, t); err != nil { + logger.Error("failed to unack task in queue", zap.Error(err)) + } + continue + } + logger.Debug("task results are stored in cache") + + if err := w.storage.SaveTaskHistory(ctx, executed); err != nil { + logger.Error("error saving task history", zap.Error(err)) + continue + } + logger.Debug("task history is saved") + + // remove task from queue + if err := w.queue.Ack(ctx, executed); err != nil { + logger.Error("failed to ack task in queue", zap.Error(err)) + } + } + } +} + +func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task, error) { + task.StartedAt = time.Now() + + var response []byte + var err error + + // task is executing a request policy OR an HTTP call to predefined URL + if task.RequestPolicy != "" { + response, err = w.policy.Evaluate(ctx, task.RequestPolicy, task.Request) + if err != nil { + return nil, errors.New("error evaluating request policy", err) + } + } else if task.URL != "" && task.Method != "" { + var status int + status, response, err = w.doHTTPTask(ctx, task) + if err != nil { + return nil, err + } + task.ResponseCode = status + } else { + return nil, errors.New(errors.Internal, "invalid task: must define either request policy or url") + } + + task.Response = response + + // evaluate response policy + if task.ResponsePolicy != "" { + resp, err := w.policy.Evaluate(ctx, task.ResponsePolicy, task.Response) + if err != nil { + return nil, errors.New("error evaluating response policy", err) + } + // overwrite task response with the one returned from the policy + task.Response = resp + } + + // evaluate finalizer policy + if task.FinalPolicy != "" { + resp, err := w.policy.Evaluate(ctx, task.FinalPolicy, task.Response) + if err != nil { + return nil, errors.New("error evaluating final policy", err) + } + // overwrite task response with the one returned from the policy + task.Response = resp + } + + task.State = taskpkg.Done + task.FinishedAt = time.Now() + return task, nil +} + +func (w *Worker) doHTTPTask(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) { + req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request)) + if err != nil { + return 0, nil, errors.New("error creating http request", err) + } + + resp, err := w.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return 0, nil, errors.New("error executing http request", err) + } + defer resp.Body.Close() // nolint:errcheck + + response, err = io.ReadAll(resp.Body) + if err != nil { + return 0, nil, errors.New("error reading response body", err) + } + + return resp.StatusCode, response, nil +} diff --git a/internal/service/task/service.go b/internal/service/task/service.go index 47a095cbb0bb79314769877587061a25c19c2853..78b713dc31d6accfbeda69bd74f5cf406e2c82ab 100644 --- a/internal/service/task/service.go +++ b/internal/service/task/service.go @@ -12,10 +12,12 @@ import ( goatask "code.vereign.com/gaiax/tsa/task/gen/task" ) +// Storage for retrieving predefined task templates. type Storage interface { TaskTemplate(ctx context.Context, taskName string) (*Task, error) } +// Queue interface for retrieving, returning and removing tasks from Queue. type Queue interface { Add(ctx context.Context, task *Task) error Poll(ctx context.Context) (*Task, error) @@ -29,6 +31,7 @@ type Service struct { logger *zap.Logger } +// New creates the task service. func New(template Storage, queue Queue, logger *zap.Logger) *Service { return &Service{ storage: template, @@ -37,7 +40,7 @@ func New(template Storage, queue Queue, logger *zap.Logger) *Service { } } -// Create a new task. +// Create a new task and put it in a Queue for later execution. func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *goatask.CreateResult, err error) { logger := s.logger.With(zap.String("taskName", req.TaskName)) @@ -45,7 +48,7 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res * task, err := s.storage.TaskTemplate(ctx, req.TaskName) if err != nil { logger.Error("error getting task template from storage", zap.Error(err)) - return nil, errors.New("error executing task", err) + return nil, err } taskRequest, err := json.Marshal(req.Data) @@ -59,6 +62,14 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res * task.CreatedAt = time.Now() task.Request = taskRequest + // if cache key namespace and scope are given, use them instead of the defaults + if req.CacheNamespace != nil && *req.CacheNamespace != "" { + task.CacheNamespace = *req.CacheNamespace + } + if req.CacheScope != nil && *req.CacheScope != "" { + task.CacheScope = *req.CacheScope + } + if err := s.queue.Add(ctx, task); err != nil { logger.Error("error adding task to queue", zap.Error(err)) return nil, errors.New("failed to create task", err) diff --git a/internal/service/task/task.go b/internal/service/task/task.go index 4a2a677974cbf2cf08d6143fdb59679ad8fd2869..dd53a5175c505258f3822fa0ee6e8cdd57533dd7 100644 --- a/internal/service/task/task.go +++ b/internal/service/task/task.go @@ -1,6 +1,9 @@ package task -import "time" +import ( + "strings" + "time" +) type State string @@ -22,15 +25,16 @@ type Task struct { GroupID string `json:"groupID"` // GroupID is set when the task is part of `task.Group`. Name string `json:"name"` // Name is used by external callers use to create tasks. State State `json:"state"` // State of the task. - URL string `json:"url"` // URL against which the task request will be executed. - Method string `json:"method"` // HTTP method of the task request. + URL string `json:"url"` // URL against which the task request will be executed (optional). + Method string `json:"method"` // HTTP method of the task request (optional). Request []byte `json:"request"` // Request body which will be sent in the task request. Response []byte `json:"response"` // Response received after the task request is executed. - ResponseCode int `json:"responseCode"` // Response code received after task request is executed. - RequestPolicy string `json:"requestPolicy"` // Request policy to be executed before task request execution. - ResponsePolicy string `json:"responsePolicy"` // Response policy to be executed before the response is accepted. - FinalPolicy string `json:"finalPolicy"` // TODO(penkovski): how is different from ResponsePolicy? - CacheKey string `json:"cacheKey"` // CacheKey used for storing the response in cache. + ResponseCode int `json:"responseCode"` // ResponseCode received after task request is executed. + RequestPolicy string `json:"requestPolicy"` // RequestPolicy to be executed before task request execution. + ResponsePolicy string `json:"responsePolicy"` // ResponsePolicy to be executed on the task response. + FinalPolicy string `json:"finalPolicy"` // FinalPolicy to be executed on the task response. + CacheNamespace string `json:"cacheNamespace"` // CacheNamespace if set, is used for constructing cache key. + CacheScope string `json:"cacheScope"` // CacheScope if set, is used for constructing cache key. CreatedAt time.Time `json:"createdAt"` // CreatedAt specifies task creation time. StartedAt time.Time `json:"startedAt"` // StartedAt specifies task execution start time. FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done. @@ -46,3 +50,17 @@ type Group struct { StartTime time.Time FinishTime time.Time } + +// CacheKey constructs the key for storing task result in the cache. +func (t *Task) CacheKey() string { + key := t.ID + namespace := strings.TrimSpace(t.CacheNamespace) + scope := strings.TrimSpace(t.CacheScope) + if namespace != "" { + key += "," + namespace + } + if scope != "" { + key += "," + scope + } + return key +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 09bb73956b7dbe7308121ff9e890ecf1354fc9bb..0e314d761b3cf3ce6f95a2ae959b180a6f7a6c54 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -4,6 +4,7 @@ import ( "context" "strings" + "github.com/cenkalti/backoff/v4" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -16,17 +17,20 @@ const ( taskDB = "task" taskTemplates = "taskTemplates" taskQueue = "tasks" + tasksHistory = "tasksHistory" ) type Storage struct { - templates *mongo.Collection - tasks *mongo.Collection + templates *mongo.Collection + tasks *mongo.Collection + tasksHistory *mongo.Collection } func New(db *mongo.Client) *Storage { return &Storage{ - templates: db.Database(taskDB).Collection(taskTemplates), - tasks: db.Database(taskDB).Collection(taskQueue), + templates: db.Database(taskDB).Collection(taskTemplates), + tasks: db.Database(taskDB).Collection(taskQueue), + tasksHistory: db.Database(taskDB).Collection(tasksHistory), } } @@ -89,7 +93,7 @@ func (s *Storage) Poll(ctx context.Context) (*task.Task, error) { return &task, nil } -// Ack removes a task from the tasks collection. +// Ack removes a task from the `tasks` collection. func (s *Storage) Ack(ctx context.Context, task *task.Task) error { _, err := s.tasks.DeleteOne(ctx, bson.M{"id": task.ID}) return err @@ -103,3 +107,14 @@ func (s *Storage) Unack(ctx context.Context, t *task.Task) error { _, err := s.tasks.UpdateOne(ctx, filter, update) return err } + +// SaveTaskHistory saves a task to the `tasksHistory` collection. +func (s *Storage) SaveTaskHistory(ctx context.Context, task *task.Task) error { + insert := func() error { + _, err := s.tasksHistory.InsertOne(ctx, task) + return err + } + + b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) + return backoff.Retry(insert, b) +} diff --git a/vendor/github.com/cenkalti/backoff/v4/.gitignore b/vendor/github.com/cenkalti/backoff/v4/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..50d95c548b67069b52ea4dcf91bf85314d488d41 Binary files /dev/null and b/vendor/github.com/cenkalti/backoff/v4/.gitignore differ diff --git a/vendor/github.com/cenkalti/backoff/v4/.travis.yml b/vendor/github.com/cenkalti/backoff/v4/.travis.yml new file mode 100644 index 0000000000000000000000000000000000000000..c79105c2fbebe29ea2a4d735de58526f58a06815 Binary files /dev/null and b/vendor/github.com/cenkalti/backoff/v4/.travis.yml differ diff --git a/vendor/github.com/cenkalti/backoff/v4/LICENSE b/vendor/github.com/cenkalti/backoff/v4/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..89b8179965581339743b5edd9e18c1fcc778b56d Binary files /dev/null and b/vendor/github.com/cenkalti/backoff/v4/LICENSE differ diff --git a/vendor/github.com/cenkalti/backoff/v4/README.md b/vendor/github.com/cenkalti/backoff/v4/README.md new file mode 100644 index 0000000000000000000000000000000000000000..16abdfc084f73cb6658c2ddf463ac02ff666dd6e Binary files /dev/null and b/vendor/github.com/cenkalti/backoff/v4/README.md differ diff --git a/vendor/github.com/cenkalti/backoff/v4/backoff.go b/vendor/github.com/cenkalti/backoff/v4/backoff.go new file mode 100644 index 0000000000000000000000000000000000000000..3676ee405d87b3dc7b675f3c93f719a2abca12d2 Binary files /dev/null and b/vendor/github.com/cenkalti/backoff/v4/backoff.go differ diff --git a/vendor/github.com/cenkalti/backoff/v4/context.go b/vendor/github.com/cenkalti/backoff/v4/context.go new file mode 100644 index 0000000000000000000000000000000000000000..48482330eb763651b2217b2704de5903b155d8d2 Binary files /dev/null and b/vendor/github.com/cenkalti/backoff/v4/context.go differ diff --git a/vendor/github.com/cenkalti/backoff/v4/exponential.go b/vendor/github.com/cenkalti/backoff/v4/exponential.go new file mode 100644 index 0000000000000000000000000000000000000000..2c56c1e7189ba605ae668e752b3781f815b1bbd1 Binary files /dev/null and b/vendor/github.com/cenkalti/backoff/v4/exponential.go differ diff --git a/vendor/github.com/cenkalti/backoff/v4/retry.go b/vendor/github.com/cenkalti/backoff/v4/retry.go new file mode 100644 index 0000000000000000000000000000000000000000..1ce2507ebc8be550a89b07d84e58f0fc55a50ee6 Binary files /dev/null and b/vendor/github.com/cenkalti/backoff/v4/retry.go differ diff --git a/vendor/github.com/cenkalti/backoff/v4/ticker.go b/vendor/github.com/cenkalti/backoff/v4/ticker.go new file mode 100644 index 0000000000000000000000000000000000000000..df9d68bce527f2952dd8a6b4569fff26b6c214c2 Binary files /dev/null and b/vendor/github.com/cenkalti/backoff/v4/ticker.go differ diff --git a/vendor/github.com/cenkalti/backoff/v4/timer.go b/vendor/github.com/cenkalti/backoff/v4/timer.go new file mode 100644 index 0000000000000000000000000000000000000000..8120d0213c58d550ea7cf20ab4d2329134abb12a Binary files /dev/null and b/vendor/github.com/cenkalti/backoff/v4/timer.go differ diff --git a/vendor/github.com/cenkalti/backoff/v4/tries.go b/vendor/github.com/cenkalti/backoff/v4/tries.go new file mode 100644 index 0000000000000000000000000000000000000000..28d58ca37c684020e7db4e1f5a1394cc4b4edef9 Binary files /dev/null and b/vendor/github.com/cenkalti/backoff/v4/tries.go differ diff --git a/vendor/modules.txt b/vendor/modules.txt index cc125ed7892e00bda7f373537e628ab49a99401a..e4d7aa31821205e1e8eb7e7920a4d8185d08933e 100644 Binary files a/vendor/modules.txt and b/vendor/modules.txt differ