From 0d894d1d00f8df46f0dabd59ff1e23c75c10db8b Mon Sep 17 00:00:00 2001
From: Lyuben Penkovski <lyuben.penkovski@vereign.com>
Date: Mon, 9 May 2022 12:13:56 +0300
Subject: [PATCH] Set task results in cache

---
 cmd/task/main.go                      |  5 +++++
 design/design.go                      |  9 ++++++++
 design/types.go                       |  2 ++
 gen/http/cli/task/cli.go              | 18 +++++++++------
 gen/http/openapi.json                 |  2 +-
 gen/http/openapi.yaml                 | 14 ++++++++++--
 gen/http/openapi3.json                |  2 +-
 gen/http/openapi3.yaml                | 32 +++++++++++++++++++++------
 gen/http/task/client/cli.go           | 18 +++++++++++++--
 gen/http/task/client/encode_decode.go |  8 +++++++
 gen/http/task/server/encode_decode.go | 14 ++++++++++--
 gen/http/task/server/types.go         |  4 +++-
 gen/task/service.go                   |  4 ++++
 internal/clients/cache/client.go      | 28 ++++++++++++++++++++---
 internal/config/config.go             |  5 +++++
 internal/executor/executor.go         | 10 ++++++++-
 internal/executor/worker.go           | 32 ++++++++++++++++++++++++---
 internal/service/task/service.go      | 15 +++++++++++--
 internal/service/task/task.go         | 26 +++++++++++++++++-----
 19 files changed, 211 insertions(+), 37 deletions(-)

diff --git a/cmd/task/main.go b/cmd/task/main.go
index d293aa6..a349fd1 100644
--- a/cmd/task/main.go
+++ b/cmd/task/main.go
@@ -24,6 +24,7 @@ import (
 	goatasksrv "code.vereign.com/gaiax/tsa/task/gen/http/task/server"
 	"code.vereign.com/gaiax/tsa/task/gen/openapi"
 	goatask "code.vereign.com/gaiax/tsa/task/gen/task"
+	"code.vereign.com/gaiax/tsa/task/internal/clients/cache"
 	"code.vereign.com/gaiax/tsa/task/internal/clients/policy"
 	"code.vereign.com/gaiax/tsa/task/internal/config"
 	"code.vereign.com/gaiax/tsa/task/internal/executor"
@@ -68,11 +69,15 @@ func main() {
 	// create policy client
 	policy := policy.New(cfg.Policy.Addr, httpClient())
 
+	// create cache client
+	cache := cache.New(cfg.Cache.Addr)
+
 	// create task executor
 	executor := executor.New(
 		storage,
 		policy,
 		storage,
+		cache,
 		cfg.Executor.Workers,
 		cfg.Executor.PollInterval,
 		httpClient(),
diff --git a/design/design.go b/design/design.go
index a733290..0e21586 100644
--- a/design/design.go
+++ b/design/design.go
@@ -24,7 +24,16 @@ var _ = Service("task", func() {
 		Result(CreateResult)
 		HTTP(func() {
 			POST("/v1/task/{taskName}")
+
+			Header("cacheNamespace:x-cache-namespace", String, "Cache key namespace", func() {
+				Example("login")
+			})
+			Header("cacheScope:x-cache-scope", String, "Cache key scope", func() {
+				Example("user")
+			})
+
 			Body("data")
+
 			Response(StatusOK)
 		})
 	})
diff --git a/design/types.go b/design/types.go
index 46eb067..ed8662a 100644
--- a/design/types.go
+++ b/design/types.go
@@ -6,6 +6,8 @@ import . "goa.design/goa/v3/dsl"
 var CreateRequest = Type("CreateRequest", func() {
 	Field(1, "taskName", String, "Task name.")
 	Field(2, "data", Any, "Data contains JSON payload that will be used for task execution.")
+	Field(3, "cacheNamespace", String, "Cache key namespace.")
+	Field(4, "cacheScope", String, "Cache key scope.")
 	Required("taskName", "data")
 })
 
diff --git a/gen/http/cli/task/cli.go b/gen/http/cli/task/cli.go
index 248d9e3..c859f0e 100644
--- a/gen/http/cli/task/cli.go
+++ b/gen/http/cli/task/cli.go
@@ -32,7 +32,7 @@ task create
 // UsageExamples produces an example of a valid invocation of the CLI tool.
 func UsageExamples() string {
 	return os.Args[0] + ` health liveness` + "\n" +
-		os.Args[0] + ` task create --body "Minus reprehenderit non quo nihil adipisci." --task-name "Omnis illum rerum."` + "\n" +
+		os.Args[0] + ` task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae."` + "\n" +
 		""
 }
 
@@ -54,9 +54,11 @@ func ParseEndpoint(
 
 		taskFlags = flag.NewFlagSet("task", flag.ContinueOnError)
 
-		taskCreateFlags        = flag.NewFlagSet("create", flag.ExitOnError)
-		taskCreateBodyFlag     = taskCreateFlags.String("body", "REQUIRED", "")
-		taskCreateTaskNameFlag = taskCreateFlags.String("task-name", "REQUIRED", "Task name.")
+		taskCreateFlags              = flag.NewFlagSet("create", flag.ExitOnError)
+		taskCreateBodyFlag           = taskCreateFlags.String("body", "REQUIRED", "")
+		taskCreateTaskNameFlag       = taskCreateFlags.String("task-name", "REQUIRED", "Task name.")
+		taskCreateCacheNamespaceFlag = taskCreateFlags.String("cache-namespace", "", "")
+		taskCreateCacheScopeFlag     = taskCreateFlags.String("cache-scope", "", "")
 	)
 	healthFlags.Usage = healthUsage
 	healthLivenessFlags.Usage = healthLivenessUsage
@@ -151,7 +153,7 @@ func ParseEndpoint(
 			switch epn {
 			case "create":
 				endpoint = c.Create()
-				data, err = taskc.BuildCreatePayload(*taskCreateBodyFlag, *taskCreateTaskNameFlag)
+				data, err = taskc.BuildCreatePayload(*taskCreateBodyFlag, *taskCreateTaskNameFlag, *taskCreateCacheNamespaceFlag, *taskCreateCacheScopeFlag)
 			}
 		}
 	}
@@ -210,13 +212,15 @@ Additional help:
 `, os.Args[0])
 }
 func taskCreateUsage() {
-	fmt.Fprintf(os.Stderr, `%[1]s [flags] task create -body JSON -task-name STRING
+	fmt.Fprintf(os.Stderr, `%[1]s [flags] task create -body JSON -task-name STRING -cache-namespace STRING -cache-scope STRING
 
 Create a task and put it in a queue for execution.
     -body JSON: 
     -task-name STRING: Task name.
+    -cache-namespace STRING: 
+    -cache-scope STRING: 
 
 Example:
-    %[1]s task create --body "Minus reprehenderit non quo nihil adipisci." --task-name "Omnis illum rerum."
+    %[1]s task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae."
 `, os.Args[0])
 }
diff --git a/gen/http/openapi.json b/gen/http/openapi.json
index 49774f5..cbf1687 100644
--- a/gen/http/openapi.json
+++ b/gen/http/openapi.json
@@ -1 +1 @@
-{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Corrupti facere sequi tempora eius assumenda molestiae."}},"example":{"taskID":"Laborum sapiente."},"required":["taskID"]}}}
\ No newline at end of file
+{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","required":false,"type":"string"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","required":false,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Eaque excepturi suscipit veritatis nemo."}},"example":{"taskID":"Corrupti quia autem dolorum sunt aperiam quaerat."},"required":["taskID"]}}}
\ No newline at end of file
diff --git a/gen/http/openapi.yaml b/gen/http/openapi.yaml
index 61bbc9c..ca530f8 100644
--- a/gen/http/openapi.yaml
+++ b/gen/http/openapi.yaml
@@ -48,6 +48,16 @@ paths:
         description: Task name.
         required: true
         type: string
+      - name: x-cache-namespace
+        in: header
+        description: Cache key namespace
+        required: false
+        type: string
+      - name: x-cache-scope
+        in: header
+        description: Cache key scope
+        required: false
+        type: string
       - name: any
         in: body
         description: Data contains JSON payload that will be used for task execution.
@@ -72,8 +82,8 @@ definitions:
       taskID:
         type: string
         description: Unique task identifier.
-        example: Corrupti facere sequi tempora eius assumenda molestiae.
+        example: Eaque excepturi suscipit veritatis nemo.
     example:
-      taskID: Laborum sapiente.
+      taskID: Corrupti quia autem dolorum sunt aperiam quaerat.
     required:
     - taskID
diff --git a/gen/http/openapi3.json b/gen/http/openapi3.json
index d579254..deb7281 100644
--- a/gen/http/openapi3.json
+++ b/gen/http/openapi3.json
@@ -1 +1 @@
-{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Eveniet et eligendi sint quibusdam quia maxime."},"example":"Et ipsa voluptate."}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Nam et.","format":"binary"},"example":"Corrupti quia autem dolorum sunt aperiam quaerat."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Pariatur sequi et."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Facere quibusdam voluptate beatae."}},"example":{"taskID":"Eaque excepturi suscipit veritatis nemo."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]}
\ No newline at end of file
+{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Vel odio et doloribus est quod laborum."},"example":"Harum aut autem aliquam dolorem non soluta."},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key namespace","example":"login"},"example":"login"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key scope","example":"user"},"example":"user"}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Eveniet et eligendi sint quibusdam quia maxime.","format":"binary"},"example":"Ipsam et est accusantium."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Facere quibusdam voluptate beatae."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Et ipsa voluptate."}},"example":{"taskID":"Quo qui fuga impedit eos fuga et."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]}
\ No newline at end of file
diff --git a/gen/http/openapi3.yaml b/gen/http/openapi3.yaml
index 2772452..8e14b7c 100644
--- a/gen/http/openapi3.yaml
+++ b/gen/http/openapi3.yaml
@@ -40,8 +40,26 @@ paths:
         schema:
           type: string
           description: Task name.
-          example: Eveniet et eligendi sint quibusdam quia maxime.
-        example: Et ipsa voluptate.
+          example: Vel odio et doloribus est quod laborum.
+        example: Harum aut autem aliquam dolorem non soluta.
+      - name: x-cache-namespace
+        in: header
+        description: Cache key namespace
+        allowEmptyValue: true
+        schema:
+          type: string
+          description: Cache key namespace
+          example: login
+        example: login
+      - name: x-cache-scope
+        in: header
+        description: Cache key scope
+        allowEmptyValue: true
+        schema:
+          type: string
+          description: Cache key scope
+          example: user
+        example: user
       requestBody:
         description: Data contains JSON payload that will be used for task execution.
         required: true
@@ -50,9 +68,9 @@ paths:
             schema:
               type: string
               description: Data contains JSON payload that will be used for task execution.
-              example: Nam et.
+              example: Eveniet et eligendi sint quibusdam quia maxime.
               format: binary
-            example: Corrupti quia autem dolorum sunt aperiam quaerat.
+            example: Ipsam et est accusantium.
       responses:
         "200":
           description: OK response.
@@ -61,7 +79,7 @@ paths:
               schema:
                 $ref: '#/components/schemas/CreateResult'
               example:
-                taskID: Pariatur sequi et.
+                taskID: Facere quibusdam voluptate beatae.
 components:
   schemas:
     CreateResult:
@@ -70,9 +88,9 @@ components:
         taskID:
           type: string
           description: Unique task identifier.
-          example: Facere quibusdam voluptate beatae.
+          example: Et ipsa voluptate.
       example:
-        taskID: Eaque excepturi suscipit veritatis nemo.
+        taskID: Quo qui fuga impedit eos fuga et.
       required:
       - taskID
 tags:
diff --git a/gen/http/task/client/cli.go b/gen/http/task/client/cli.go
index 5ead7e9..b6ff7fb 100644
--- a/gen/http/task/client/cli.go
+++ b/gen/http/task/client/cli.go
@@ -16,24 +16,38 @@ import (
 
 // BuildCreatePayload builds the payload for the task Create endpoint from CLI
 // flags.
-func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string) (*task.CreateRequest, error) {
+func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string, taskCreateCacheNamespace string, taskCreateCacheScope string) (*task.CreateRequest, error) {
 	var err error
 	var body interface{}
 	{
 		err = json.Unmarshal([]byte(taskCreateBody), &body)
 		if err != nil {
-			return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Minus reprehenderit non quo nihil adipisci.\"")
+			return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Nam et.\"")
 		}
 	}
 	var taskName string
 	{
 		taskName = taskCreateTaskName
 	}
+	var cacheNamespace *string
+	{
+		if taskCreateCacheNamespace != "" {
+			cacheNamespace = &taskCreateCacheNamespace
+		}
+	}
+	var cacheScope *string
+	{
+		if taskCreateCacheScope != "" {
+			cacheScope = &taskCreateCacheScope
+		}
+	}
 	v := body
 	res := &task.CreateRequest{
 		Data: v,
 	}
 	res.TaskName = taskName
+	res.CacheNamespace = cacheNamespace
+	res.CacheScope = cacheScope
 
 	return res, nil
 }
diff --git a/gen/http/task/client/encode_decode.go b/gen/http/task/client/encode_decode.go
index 5d4bfc7..52ac5bd 100644
--- a/gen/http/task/client/encode_decode.go
+++ b/gen/http/task/client/encode_decode.go
@@ -51,6 +51,14 @@ func EncodeCreateRequest(encoder func(*http.Request) goahttp.Encoder) func(*http
 		if !ok {
 			return goahttp.ErrInvalidType("task", "Create", "*task.CreateRequest", v)
 		}
+		if p.CacheNamespace != nil {
+			head := *p.CacheNamespace
+			req.Header.Set("x-cache-namespace", head)
+		}
+		if p.CacheScope != nil {
+			head := *p.CacheScope
+			req.Header.Set("x-cache-scope", head)
+		}
 		body := p.Data
 		if err := encoder(req).Encode(&body); err != nil {
 			return goahttp.ErrEncodingError("task", "Create", err)
diff --git a/gen/http/task/server/encode_decode.go b/gen/http/task/server/encode_decode.go
index 4e6ddfa..f4549af 100644
--- a/gen/http/task/server/encode_decode.go
+++ b/gen/http/task/server/encode_decode.go
@@ -46,12 +46,22 @@ func DecodeCreateRequest(mux goahttp.Muxer, decoder func(*http.Request) goahttp.
 		}
 
 		var (
-			taskName string
+			taskName       string
+			cacheNamespace *string
+			cacheScope     *string
 
 			params = mux.Vars(r)
 		)
 		taskName = params["taskName"]
-		payload := NewCreateRequest(body, taskName)
+		cacheNamespaceRaw := r.Header.Get("x-cache-namespace")
+		if cacheNamespaceRaw != "" {
+			cacheNamespace = &cacheNamespaceRaw
+		}
+		cacheScopeRaw := r.Header.Get("x-cache-scope")
+		if cacheScopeRaw != "" {
+			cacheScope = &cacheScopeRaw
+		}
+		payload := NewCreateRequest(body, taskName, cacheNamespace, cacheScope)
 
 		return payload, nil
 	}
diff --git a/gen/http/task/server/types.go b/gen/http/task/server/types.go
index d0247fc..de98cf1 100644
--- a/gen/http/task/server/types.go
+++ b/gen/http/task/server/types.go
@@ -28,12 +28,14 @@ func NewCreateResponseBody(res *task.CreateResult) *CreateResponseBody {
 }
 
 // NewCreateRequest builds a task service Create endpoint payload.
-func NewCreateRequest(body interface{}, taskName string) *task.CreateRequest {
+func NewCreateRequest(body interface{}, taskName string, cacheNamespace *string, cacheScope *string) *task.CreateRequest {
 	v := body
 	res := &task.CreateRequest{
 		Data: v,
 	}
 	res.TaskName = taskName
+	res.CacheNamespace = cacheNamespace
+	res.CacheScope = cacheScope
 
 	return res
 }
diff --git a/gen/task/service.go b/gen/task/service.go
index 6bd66d5..13a5d28 100644
--- a/gen/task/service.go
+++ b/gen/task/service.go
@@ -33,6 +33,10 @@ type CreateRequest struct {
 	TaskName string
 	// Data contains JSON payload that will be used for task execution.
 	Data interface{}
+	// Cache key namespace.
+	CacheNamespace *string
+	// Cache key scope.
+	CacheScope *string
 }
 
 // CreateResult is the result type of the task service Create method.
diff --git a/internal/clients/cache/client.go b/internal/clients/cache/client.go
index b4577a6..dc9033b 100644
--- a/internal/clients/cache/client.go
+++ b/internal/clients/cache/client.go
@@ -1,6 +1,7 @@
 package cache
 
 import (
+	"bytes"
 	"context"
 	"fmt"
 	"net/http"
@@ -25,10 +26,31 @@ func New(addr string, opts ...Option) *Client {
 	return c
 }
 
-func (c *Client) Set(ctx context.Context, value []byte) error {
-	return fmt.Errorf("not implemented")
+func (c *Client) Set(ctx context.Context, key, namespace, scope string, value []byte) error {
+	req, err := http.NewRequest("POST", c.addr+"/v1/cache", bytes.NewReader(value))
+	if err != nil {
+		return err
+	}
+
+	req.Header = http.Header{
+		"x-cache-key":       []string{key},
+		"x-cache-namespace": []string{namespace},
+		"x-cache-scope":     []string{scope},
+	}
+
+	resp, err := c.httpClient.Do(req.WithContext(ctx))
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close() // nolint:errcheck
+
+	if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
+		return fmt.Errorf("unexpected response code: %d", resp.StatusCode)
+	}
+
+	return nil
 }
 
-func (c *Client) Get(ctx context.Context, key string) ([]byte, error) {
+func (c *Client) Get(ctx context.Context, key, namespace, scope string) ([]byte, error) {
 	return nil, fmt.Errorf("not implemented")
 }
diff --git a/internal/config/config.go b/internal/config/config.go
index a7a167e..f68e90b 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -7,6 +7,7 @@ type Config struct {
 	Mongo    mongoConfig
 	Policy   policyConfig
 	Executor executorConfig
+	Cache    cacheConfig
 
 	LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"`
 }
@@ -33,3 +34,7 @@ type executorConfig struct {
 	Workers      int           `envconfig:"EXECUTOR_WORKERS" default:"5"`
 	PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"`
 }
+
+type cacheConfig struct {
+	Addr string `envconfig:"CACHE_ADDR" required:"true"`
+}
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 762df34..c59bc74 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -28,10 +28,16 @@ type Storage interface {
 	SaveTaskHistory(ctx context.Context, task *task.Task) error
 }
 
+type Cache interface {
+	Set(ctx context.Context, key, namespace, scope string, value []byte) error
+	Get(ctx context.Context, key, namespace, scope string) ([]byte, error)
+}
+
 type Executor struct {
 	queue        Queue
 	policy       Policy
 	storage      Storage
+	cache        Cache
 	workers      int
 	pollInterval time.Duration
 
@@ -43,6 +49,7 @@ func New(
 	queue Queue,
 	policy Policy,
 	storage Storage,
+	cache Cache,
 	workers int,
 	pollInterval time.Duration,
 	httpClient *http.Client,
@@ -52,6 +59,7 @@ func New(
 		queue:        queue,
 		policy:       policy,
 		storage:      storage,
+		cache:        cache,
 		workers:      workers,
 		pollInterval: pollInterval,
 		httpClient:   httpClient,
@@ -68,7 +76,7 @@ func (e *Executor) Start(ctx context.Context) error {
 		wg.Add(1)
 		go func() {
 			defer wg.Done()
-			worker := newWorker(tasks, e.queue, e.policy, e.storage, e.httpClient, e.logger)
+			worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.httpClient, e.logger)
 			worker.Start(ctx)
 		}()
 	}
diff --git a/internal/executor/worker.go b/internal/executor/worker.go
index d03bdc1..dd36b3d 100644
--- a/internal/executor/worker.go
+++ b/internal/executor/worker.go
@@ -18,16 +18,26 @@ type Worker struct {
 	queue      Queue
 	policy     Policy
 	storage    Storage
+	cache      Cache
 	httpClient *http.Client
 	logger     *zap.Logger
 }
 
-func newWorker(tasks chan *taskpkg.Task, queue Queue, policy Policy, storage Storage, httpClient *http.Client, logger *zap.Logger) *Worker {
+func newWorker(
+	tasks chan *taskpkg.Task,
+	queue Queue,
+	policy Policy,
+	storage Storage,
+	cache Cache,
+	httpClient *http.Client,
+	logger *zap.Logger,
+) *Worker {
 	return &Worker{
 		tasks:      tasks,
 		queue:      queue,
 		policy:     policy,
 		storage:    storage,
+		cache:      cache,
 		httpClient: httpClient,
 		logger:     logger,
 	}
@@ -55,6 +65,20 @@ func (w *Worker) Start(ctx context.Context) {
 				continue
 			}
 
+			if err := w.cache.Set(
+				ctx,
+				executed.ID,
+				executed.CacheNamespace,
+				executed.CacheScope,
+				executed.Response,
+			); err != nil {
+				logger.Error("error storing task result in cache", zap.Error(err))
+				if err := w.queue.Unack(ctx, t); err != nil {
+					logger.Error("failed to unack task in queue", zap.Error(err))
+				}
+				continue
+			}
+
 			if err := w.storage.SaveTaskHistory(ctx, executed); err != nil {
 				logger.Error("error saving task history", zap.Error(err))
 				continue
@@ -102,14 +126,16 @@ func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task
 
 	// evaluate finalizer policy
 	if task.FinalPolicy != "" {
-		if _, err := w.policy.Evaluate(ctx, task.FinalPolicy, task.Response); err != nil {
+		resp, err := w.policy.Evaluate(ctx, task.FinalPolicy, task.Response)
+		if err != nil {
 			return nil, errors.New("error evaluating final policy", err)
 		}
+		// overwrite task response with the one returned from the policy
+		task.Response = resp
 	}
 
 	task.FinishedAt = time.Now()
 	return task, nil
-
 }
 
 func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) {
diff --git a/internal/service/task/service.go b/internal/service/task/service.go
index 47a095c..78b713d 100644
--- a/internal/service/task/service.go
+++ b/internal/service/task/service.go
@@ -12,10 +12,12 @@ import (
 	goatask "code.vereign.com/gaiax/tsa/task/gen/task"
 )
 
+// Storage for retrieving predefined task templates.
 type Storage interface {
 	TaskTemplate(ctx context.Context, taskName string) (*Task, error)
 }
 
+// Queue interface for retrieving, returning and removing tasks from Queue.
 type Queue interface {
 	Add(ctx context.Context, task *Task) error
 	Poll(ctx context.Context) (*Task, error)
@@ -29,6 +31,7 @@ type Service struct {
 	logger  *zap.Logger
 }
 
+// New creates the task service.
 func New(template Storage, queue Queue, logger *zap.Logger) *Service {
 	return &Service{
 		storage: template,
@@ -37,7 +40,7 @@ func New(template Storage, queue Queue, logger *zap.Logger) *Service {
 	}
 }
 
-// Create a new task.
+// Create a new task and put it in a Queue for later execution.
 func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *goatask.CreateResult, err error) {
 	logger := s.logger.With(zap.String("taskName", req.TaskName))
 
@@ -45,7 +48,7 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *
 	task, err := s.storage.TaskTemplate(ctx, req.TaskName)
 	if err != nil {
 		logger.Error("error getting task template from storage", zap.Error(err))
-		return nil, errors.New("error executing task", err)
+		return nil, err
 	}
 
 	taskRequest, err := json.Marshal(req.Data)
@@ -59,6 +62,14 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *
 	task.CreatedAt = time.Now()
 	task.Request = taskRequest
 
+	// if cache key namespace and scope are given, use them instead of the defaults
+	if req.CacheNamespace != nil && *req.CacheNamespace != "" {
+		task.CacheNamespace = *req.CacheNamespace
+	}
+	if req.CacheScope != nil && *req.CacheScope != "" {
+		task.CacheScope = *req.CacheScope
+	}
+
 	if err := s.queue.Add(ctx, task); err != nil {
 		logger.Error("error adding task to queue", zap.Error(err))
 		return nil, errors.New("failed to create task", err)
diff --git a/internal/service/task/task.go b/internal/service/task/task.go
index 78a25a4..962a887 100644
--- a/internal/service/task/task.go
+++ b/internal/service/task/task.go
@@ -1,6 +1,7 @@
 package task
 
 import (
+	"strings"
 	"time"
 )
 
@@ -28,11 +29,12 @@ type Task struct {
 	Method         string    `json:"method"`         // HTTP method of the task request.
 	Request        []byte    `json:"request"`        // Request body which will be sent in the task request.
 	Response       []byte    `json:"response"`       // Response received after the task request is executed.
-	ResponseCode   int       `json:"responseCode"`   // Response code received after task request is executed.
-	RequestPolicy  string    `json:"requestPolicy"`  // Request policy to be executed before task request execution.
-	ResponsePolicy string    `json:"responsePolicy"` // Response policy to be executed before the response is accepted.
-	FinalPolicy    string    `json:"finalPolicy"`    // TODO(penkovski): how is different from ResponsePolicy?
-	CacheKey       string    `json:"cacheKey"`       // CacheKey used for storing the response in cache.
+	ResponseCode   int       `json:"responseCode"`   // ResponseCode received after task request is executed.
+	RequestPolicy  string    `json:"requestPolicy"`  // RequestPolicy to be executed before task request execution.
+	ResponsePolicy string    `json:"responsePolicy"` // ResponsePolicy to be executed on the task response.
+	FinalPolicy    string    `json:"finalPolicy"`    // FinalPolicy to be executed on the task response.
+	CacheNamespace string    `json:"cacheNamespace"` // CacheNamespace if set, is used for constructing cache key.
+	CacheScope     string    `json:"cacheScope"`     // CacheScope if set, is used for constructing cache key.
 	CreatedAt      time.Time `json:"createdAt"`      // CreatedAt specifies task creation time.
 	StartedAt      time.Time `json:"startedAt"`      // StartedAt specifies task execution start time.
 	FinishedAt     time.Time `json:"finishedAt"`     // FinishedAt specifies the time when the task is done.
@@ -48,3 +50,17 @@ type Group struct {
 	StartTime   time.Time
 	FinishTime  time.Time
 }
+
+// CacheKey constructs the key for storing task result in the cache.
+func (t *Task) CacheKey() string {
+	key := t.ID
+	namespace := strings.TrimSpace(t.CacheNamespace)
+	scope := strings.TrimSpace(t.CacheScope)
+	if namespace != "" {
+		key += "," + namespace
+	}
+	if scope != "" {
+		key += "," + scope
+	}
+	return key
+}
-- 
GitLab