diff --git a/cmd/task/main.go b/cmd/task/main.go index a349fd17c7b7cb703187fde31e9cb172592a528a..e31f03b83bafe6af209aa2a8d75d008fe0418428 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -90,7 +90,7 @@ func main() { healthSvc goahealth.Service ) { - taskSvc = task.New(storage, storage, logger) + taskSvc = task.New(storage, storage, cache, logger) healthSvc = health.New() } diff --git a/design/design.go b/design/design.go index 0e21586fbef9ea8846c3c78c87d649076b2398c8..f78f0680296dcbe79ae5a2edd5e413d12446c117 100644 --- a/design/design.go +++ b/design/design.go @@ -37,6 +37,16 @@ var _ = Service("task", func() { Response(StatusOK) }) }) + + Method("TaskResult", func() { + Description("TaskResult retrieves task result from the Cache service.") + Payload(TaskResultRequest) + Result(Any) + HTTP(func() { + GET("/v1/taskResult/{taskID}") + Response(StatusOK) + }) + }) }) var _ = Service("health", func() { diff --git a/design/types.go b/design/types.go index ed8662a42b4811224176ed58405dafc801b6f082..7625fe78437e5f08fb4a3a4c2c9cce2f699b316f 100644 --- a/design/types.go +++ b/design/types.go @@ -15,3 +15,8 @@ var CreateResult = Type("CreateResult", func() { Field(1, "taskID", String, "Unique task identifier.") Required("taskID") }) + +var TaskResultRequest = Type("TaskResultRequest", func() { + Field(1, "taskID", String, "Unique task identifier.") + Required("taskID") +}) diff --git a/gen/http/cli/task/cli.go b/gen/http/cli/task/cli.go index c859f0e4edee502712eaaba346baa5782937eddc..ca12630d5df90424c74955045a70afd451dcb583 100644 --- a/gen/http/cli/task/cli.go +++ b/gen/http/cli/task/cli.go @@ -25,14 +25,14 @@ import ( // func UsageCommands() string { return `health (liveness|readiness) -task create +task (create|task-result) ` } // UsageExamples produces an example of a valid invocation of the CLI tool. func UsageExamples() string { return os.Args[0] + ` health liveness` + "\n" + - os.Args[0] + ` task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae."` + "\n" + + os.Args[0] + ` task create --body "Eaque excepturi suscipit veritatis nemo." --task-name "Corrupti facere sequi tempora eius assumenda molestiae." --cache-namespace "Laborum sapiente." --cache-scope "Nam et."` + "\n" + "" } @@ -59,6 +59,9 @@ func ParseEndpoint( taskCreateTaskNameFlag = taskCreateFlags.String("task-name", "REQUIRED", "Task name.") taskCreateCacheNamespaceFlag = taskCreateFlags.String("cache-namespace", "", "") taskCreateCacheScopeFlag = taskCreateFlags.String("cache-scope", "", "") + + taskTaskResultFlags = flag.NewFlagSet("task-result", flag.ExitOnError) + taskTaskResultTaskIDFlag = taskTaskResultFlags.String("task-id", "REQUIRED", "Unique task identifier.") ) healthFlags.Usage = healthUsage healthLivenessFlags.Usage = healthLivenessUsage @@ -66,6 +69,7 @@ func ParseEndpoint( taskFlags.Usage = taskUsage taskCreateFlags.Usage = taskCreateUsage + taskTaskResultFlags.Usage = taskTaskResultUsage if err := flag.CommandLine.Parse(os.Args[1:]); err != nil { return nil, nil, err @@ -116,6 +120,9 @@ func ParseEndpoint( case "create": epf = taskCreateFlags + case "task-result": + epf = taskTaskResultFlags + } } @@ -154,6 +161,9 @@ func ParseEndpoint( case "create": endpoint = c.Create() data, err = taskc.BuildCreatePayload(*taskCreateBodyFlag, *taskCreateTaskNameFlag, *taskCreateCacheNamespaceFlag, *taskCreateCacheScopeFlag) + case "task-result": + endpoint = c.TaskResult() + data, err = taskc.BuildTaskResultPayload(*taskTaskResultTaskIDFlag) } } } @@ -206,6 +216,7 @@ Usage: COMMAND: create: Create a task and put it in a queue for execution. + task-result: TaskResult retrieves task result from the Cache service. Additional help: %[1]s task COMMAND --help @@ -221,6 +232,17 @@ Create a task and put it in a queue for execution. -cache-scope STRING: Example: - %[1]s task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae." + %[1]s task create --body "Eaque excepturi suscipit veritatis nemo." --task-name "Corrupti facere sequi tempora eius assumenda molestiae." --cache-namespace "Laborum sapiente." --cache-scope "Nam et." +`, os.Args[0]) +} + +func taskTaskResultUsage() { + fmt.Fprintf(os.Stderr, `%[1]s [flags] task task-result -task-id STRING + +TaskResult retrieves task result from the Cache service. + -task-id STRING: Unique task identifier. + +Example: + %[1]s task task-result --task-id "Et ipsa voluptate." `, os.Args[0]) } diff --git a/gen/http/openapi.json b/gen/http/openapi.json index cbf1687e983ec137eb1e5d86c0eefec31be36b3d..78e36c4475d9a753284914cfcbc51172644ffe72 100644 --- a/gen/http/openapi.json +++ b/gen/http/openapi.json @@ -1 +1 @@ -{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","required":false,"type":"string"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","required":false,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Eaque excepturi suscipit veritatis nemo."}},"example":{"taskID":"Corrupti quia autem dolorum sunt aperiam quaerat."},"required":["taskID"]}}} \ No newline at end of file +{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","required":false,"type":"string"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","required":false,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}},"/v1/taskResult/{taskID}":{"get":{"tags":["task"],"summary":"TaskResult task","description":"TaskResult retrieves task result from the Cache service.","operationId":"task#TaskResult","parameters":[{"name":"taskID","in":"path","description":"Unique task identifier.","required":true,"type":"string"}],"responses":{"200":{"description":"OK response.","schema":{"type":"string","format":"binary"}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Vel odio et doloribus est quod laborum."}},"example":{"taskID":"Harum aut autem aliquam dolorem non soluta."},"required":["taskID"]}}} \ No newline at end of file diff --git a/gen/http/openapi.yaml b/gen/http/openapi.yaml index ca530f83c341213f3f961f435c8ecf88d4d75f6e..1ccd3b61d33f7ff620ecc4d959899033d4d70ded 100644 --- a/gen/http/openapi.yaml +++ b/gen/http/openapi.yaml @@ -74,6 +74,27 @@ paths: - taskID schemes: - http + /v1/taskResult/{taskID}: + get: + tags: + - task + summary: TaskResult task + description: TaskResult retrieves task result from the Cache service. + operationId: task#TaskResult + parameters: + - name: taskID + in: path + description: Unique task identifier. + required: true + type: string + responses: + "200": + description: OK response. + schema: + type: string + format: binary + schemes: + - http definitions: TaskCreateResponseBody: title: TaskCreateResponseBody @@ -82,8 +103,8 @@ definitions: taskID: type: string description: Unique task identifier. - example: Eaque excepturi suscipit veritatis nemo. + example: Vel odio et doloribus est quod laborum. example: - taskID: Corrupti quia autem dolorum sunt aperiam quaerat. + taskID: Harum aut autem aliquam dolorem non soluta. required: - taskID diff --git a/gen/http/openapi3.json b/gen/http/openapi3.json index deb72810cb4565839f30f5ddb709475fc8d2bebf..926387963b08082fda67874bb159f6e86d4c8295 100644 --- a/gen/http/openapi3.json +++ b/gen/http/openapi3.json @@ -1 +1 @@ -{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Vel odio et doloribus est quod laborum."},"example":"Harum aut autem aliquam dolorem non soluta."},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key namespace","example":"login"},"example":"login"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key scope","example":"user"},"example":"user"}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Eveniet et eligendi sint quibusdam quia maxime.","format":"binary"},"example":"Ipsam et est accusantium."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Facere quibusdam voluptate beatae."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Et ipsa voluptate."}},"example":{"taskID":"Quo qui fuga impedit eos fuga et."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} \ No newline at end of file +{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Aut qui aut itaque et commodi vel."},"example":"Sapiente et."},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key namespace","example":"login"},"example":"login"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key scope","example":"user"},"example":"user"}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Excepturi aut consequatur animi rerum.","format":"binary"},"example":"Dolorem illo officiis ipsa impedit harum et."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Corrupti quia autem dolorum sunt aperiam quaerat."}}}}}}},"/v1/taskResult/{taskID}":{"get":{"tags":["task"],"summary":"TaskResult task","description":"TaskResult retrieves task result from the Cache service.","operationId":"task#TaskResult","parameters":[{"name":"taskID","in":"path","description":"Unique task identifier.","required":true,"schema":{"type":"string","description":"Unique task identifier.","example":"Delectus natus eos."},"example":"Quae ut dolores ab."}],"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"type":"string","example":"Illum quidem sapiente sed velit.","format":"binary"},"example":"Omnis commodi reiciendis eum non."}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Dolores atque error ab."}},"example":{"taskID":"Laboriosam perspiciatis vitae numquam."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} \ No newline at end of file diff --git a/gen/http/openapi3.yaml b/gen/http/openapi3.yaml index 8e14b7c0de5748cbd7901c7eea9745a8feb357ca..7822203e80727be6ad349a505c7520f5f53bf1be 100644 --- a/gen/http/openapi3.yaml +++ b/gen/http/openapi3.yaml @@ -40,8 +40,8 @@ paths: schema: type: string description: Task name. - example: Vel odio et doloribus est quod laborum. - example: Harum aut autem aliquam dolorem non soluta. + example: Aut qui aut itaque et commodi vel. + example: Sapiente et. - name: x-cache-namespace in: header description: Cache key namespace @@ -68,9 +68,9 @@ paths: schema: type: string description: Data contains JSON payload that will be used for task execution. - example: Eveniet et eligendi sint quibusdam quia maxime. + example: Excepturi aut consequatur animi rerum. format: binary - example: Ipsam et est accusantium. + example: Dolorem illo officiis ipsa impedit harum et. responses: "200": description: OK response. @@ -79,7 +79,34 @@ paths: schema: $ref: '#/components/schemas/CreateResult' example: - taskID: Facere quibusdam voluptate beatae. + taskID: Corrupti quia autem dolorum sunt aperiam quaerat. + /v1/taskResult/{taskID}: + get: + tags: + - task + summary: TaskResult task + description: TaskResult retrieves task result from the Cache service. + operationId: task#TaskResult + parameters: + - name: taskID + in: path + description: Unique task identifier. + required: true + schema: + type: string + description: Unique task identifier. + example: Delectus natus eos. + example: Quae ut dolores ab. + responses: + "200": + description: OK response. + content: + application/json: + schema: + type: string + example: Illum quidem sapiente sed velit. + format: binary + example: Omnis commodi reiciendis eum non. components: schemas: CreateResult: @@ -88,9 +115,9 @@ components: taskID: type: string description: Unique task identifier. - example: Et ipsa voluptate. + example: Dolores atque error ab. example: - taskID: Quo qui fuga impedit eos fuga et. + taskID: Laboriosam perspiciatis vitae numquam. required: - taskID tags: diff --git a/gen/http/task/client/cli.go b/gen/http/task/client/cli.go index b6ff7fb25a49ddd34dfa22013d4c4d398536335f..f4e478f9179ff782d6cf169ef51804e1f7d6eed9 100644 --- a/gen/http/task/client/cli.go +++ b/gen/http/task/client/cli.go @@ -22,7 +22,7 @@ func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string, taskCr { err = json.Unmarshal([]byte(taskCreateBody), &body) if err != nil { - return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Nam et.\"") + return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Eaque excepturi suscipit veritatis nemo.\"") } } var taskName string @@ -51,3 +51,16 @@ func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string, taskCr return res, nil } + +// BuildTaskResultPayload builds the payload for the task TaskResult endpoint +// from CLI flags. +func BuildTaskResultPayload(taskTaskResultTaskID string) (*task.TaskResultRequest, error) { + var taskID string + { + taskID = taskTaskResultTaskID + } + v := &task.TaskResultRequest{} + v.TaskID = taskID + + return v, nil +} diff --git a/gen/http/task/client/client.go b/gen/http/task/client/client.go index 2e27b44317476b4161315e912950aa8520a444ca..6a7fd72c35116d724783150081a061e53aeb0806 100644 --- a/gen/http/task/client/client.go +++ b/gen/http/task/client/client.go @@ -20,6 +20,10 @@ type Client struct { // Create Doer is the HTTP client used to make requests to the Create endpoint. CreateDoer goahttp.Doer + // TaskResult Doer is the HTTP client used to make requests to the TaskResult + // endpoint. + TaskResultDoer goahttp.Doer + // RestoreResponseBody controls whether the response bodies are reset after // decoding so they can be read again. RestoreResponseBody bool @@ -41,6 +45,7 @@ func NewClient( ) *Client { return &Client{ CreateDoer: doer, + TaskResultDoer: doer, RestoreResponseBody: restoreBody, scheme: scheme, host: host, @@ -72,3 +77,22 @@ func (c *Client) Create() goa.Endpoint { return decodeResponse(resp) } } + +// TaskResult returns an endpoint that makes HTTP requests to the task service +// TaskResult server. +func (c *Client) TaskResult() goa.Endpoint { + var ( + decodeResponse = DecodeTaskResultResponse(c.decoder, c.RestoreResponseBody) + ) + return func(ctx context.Context, v interface{}) (interface{}, error) { + req, err := c.BuildTaskResultRequest(ctx, v) + if err != nil { + return nil, err + } + resp, err := c.TaskResultDoer.Do(req) + if err != nil { + return nil, goahttp.ErrRequestError("task", "TaskResult", err) + } + return decodeResponse(resp) + } +} diff --git a/gen/http/task/client/encode_decode.go b/gen/http/task/client/encode_decode.go index 52ac5bd62e0c8889272354c4336f550af8ea593f..a7ecbf5caea14083c7bcfdac4483920e08fa1835 100644 --- a/gen/http/task/client/encode_decode.go +++ b/gen/http/task/client/encode_decode.go @@ -106,3 +106,63 @@ func DecodeCreateResponse(decoder func(*http.Response) goahttp.Decoder, restoreB } } } + +// BuildTaskResultRequest instantiates a HTTP request object with method and +// path set to call the "task" service "TaskResult" endpoint +func (c *Client) BuildTaskResultRequest(ctx context.Context, v interface{}) (*http.Request, error) { + var ( + taskID string + ) + { + p, ok := v.(*task.TaskResultRequest) + if !ok { + return nil, goahttp.ErrInvalidType("task", "TaskResult", "*task.TaskResultRequest", v) + } + taskID = p.TaskID + } + u := &url.URL{Scheme: c.scheme, Host: c.host, Path: TaskResultTaskPath(taskID)} + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, goahttp.ErrInvalidURL("task", "TaskResult", u.String(), err) + } + if ctx != nil { + req = req.WithContext(ctx) + } + + return req, nil +} + +// DecodeTaskResultResponse returns a decoder for responses returned by the +// task TaskResult endpoint. restoreBody controls whether the response body +// should be restored after having been read. +func DecodeTaskResultResponse(decoder func(*http.Response) goahttp.Decoder, restoreBody bool) func(*http.Response) (interface{}, error) { + return func(resp *http.Response) (interface{}, error) { + if restoreBody { + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + resp.Body = ioutil.NopCloser(bytes.NewBuffer(b)) + defer func() { + resp.Body = ioutil.NopCloser(bytes.NewBuffer(b)) + }() + } else { + defer resp.Body.Close() + } + switch resp.StatusCode { + case http.StatusOK: + var ( + body interface{} + err error + ) + err = decoder(resp).Decode(&body) + if err != nil { + return nil, goahttp.ErrDecodingError("task", "TaskResult", err) + } + return body, nil + default: + body, _ := ioutil.ReadAll(resp.Body) + return nil, goahttp.ErrInvalidResponse("task", "TaskResult", resp.StatusCode, string(body)) + } + } +} diff --git a/gen/http/task/client/paths.go b/gen/http/task/client/paths.go index f3f5bc05dd782f6183c29b1eeed9f3fc94b26f6a..1b210e99f5d88b733fea49e43fcfc6549c8d58c0 100644 --- a/gen/http/task/client/paths.go +++ b/gen/http/task/client/paths.go @@ -15,3 +15,8 @@ import ( func CreateTaskPath(taskName string) string { return fmt.Sprintf("/v1/task/%v", taskName) } + +// TaskResultTaskPath returns the URL path to the task service TaskResult HTTP endpoint. +func TaskResultTaskPath(taskID string) string { + return fmt.Sprintf("/v1/taskResult/%v", taskID) +} diff --git a/gen/http/task/server/encode_decode.go b/gen/http/task/server/encode_decode.go index f4549af31a070fd6f67faa7273753f1475c0929b..716de0ada1b13cec9bb4401ef6e06a8ec19b3a26 100644 --- a/gen/http/task/server/encode_decode.go +++ b/gen/http/task/server/encode_decode.go @@ -66,3 +66,31 @@ func DecodeCreateRequest(mux goahttp.Muxer, decoder func(*http.Request) goahttp. return payload, nil } } + +// EncodeTaskResultResponse returns an encoder for responses returned by the +// task TaskResult endpoint. +func EncodeTaskResultResponse(encoder func(context.Context, http.ResponseWriter) goahttp.Encoder) func(context.Context, http.ResponseWriter, interface{}) error { + return func(ctx context.Context, w http.ResponseWriter, v interface{}) error { + res, _ := v.(interface{}) + enc := encoder(ctx, w) + body := res + w.WriteHeader(http.StatusOK) + return enc.Encode(body) + } +} + +// DecodeTaskResultRequest returns a decoder for requests sent to the task +// TaskResult endpoint. +func DecodeTaskResultRequest(mux goahttp.Muxer, decoder func(*http.Request) goahttp.Decoder) func(*http.Request) (interface{}, error) { + return func(r *http.Request) (interface{}, error) { + var ( + taskID string + + params = mux.Vars(r) + ) + taskID = params["taskID"] + payload := NewTaskResultRequest(taskID) + + return payload, nil + } +} diff --git a/gen/http/task/server/paths.go b/gen/http/task/server/paths.go index 6d72cc965b9783a051a2df93e48ee1f394d21188..cd34675679b694ad6b137837079466ddad4a3228 100644 --- a/gen/http/task/server/paths.go +++ b/gen/http/task/server/paths.go @@ -15,3 +15,8 @@ import ( func CreateTaskPath(taskName string) string { return fmt.Sprintf("/v1/task/%v", taskName) } + +// TaskResultTaskPath returns the URL path to the task service TaskResult HTTP endpoint. +func TaskResultTaskPath(taskID string) string { + return fmt.Sprintf("/v1/taskResult/%v", taskID) +} diff --git a/gen/http/task/server/server.go b/gen/http/task/server/server.go index 8dcbd4c96344e3dce09c4b848f33c8a7ccb8c263..589e54c33f58b7988bb7fbf8d374f5b881ee0bf5 100644 --- a/gen/http/task/server/server.go +++ b/gen/http/task/server/server.go @@ -18,8 +18,9 @@ import ( // Server lists the task service endpoint HTTP handlers. type Server struct { - Mounts []*MountPoint - Create http.Handler + Mounts []*MountPoint + Create http.Handler + TaskResult http.Handler } // ErrorNamer is an interface implemented by generated error structs that @@ -56,8 +57,10 @@ func New( return &Server{ Mounts: []*MountPoint{ {"Create", "POST", "/v1/task/{taskName}"}, + {"TaskResult", "GET", "/v1/taskResult/{taskID}"}, }, - Create: NewCreateHandler(e.Create, mux, decoder, encoder, errhandler, formatter), + Create: NewCreateHandler(e.Create, mux, decoder, encoder, errhandler, formatter), + TaskResult: NewTaskResultHandler(e.TaskResult, mux, decoder, encoder, errhandler, formatter), } } @@ -67,11 +70,13 @@ func (s *Server) Service() string { return "task" } // Use wraps the server handlers with the given middleware. func (s *Server) Use(m func(http.Handler) http.Handler) { s.Create = m(s.Create) + s.TaskResult = m(s.TaskResult) } // Mount configures the mux to serve the task endpoints. func Mount(mux goahttp.Muxer, h *Server) { MountCreateHandler(mux, h.Create) + MountTaskResultHandler(mux, h.TaskResult) } // Mount configures the mux to serve the task endpoints. @@ -129,3 +134,54 @@ func NewCreateHandler( } }) } + +// MountTaskResultHandler configures the mux to serve the "task" service +// "TaskResult" endpoint. +func MountTaskResultHandler(mux goahttp.Muxer, h http.Handler) { + f, ok := h.(http.HandlerFunc) + if !ok { + f = func(w http.ResponseWriter, r *http.Request) { + h.ServeHTTP(w, r) + } + } + mux.Handle("GET", "/v1/taskResult/{taskID}", f) +} + +// NewTaskResultHandler creates a HTTP handler which loads the HTTP request and +// calls the "task" service "TaskResult" endpoint. +func NewTaskResultHandler( + endpoint goa.Endpoint, + mux goahttp.Muxer, + decoder func(*http.Request) goahttp.Decoder, + encoder func(context.Context, http.ResponseWriter) goahttp.Encoder, + errhandler func(context.Context, http.ResponseWriter, error), + formatter func(err error) goahttp.Statuser, +) http.Handler { + var ( + decodeRequest = DecodeTaskResultRequest(mux, decoder) + encodeResponse = EncodeTaskResultResponse(encoder) + encodeError = goahttp.ErrorEncoder(encoder, formatter) + ) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := context.WithValue(r.Context(), goahttp.AcceptTypeKey, r.Header.Get("Accept")) + ctx = context.WithValue(ctx, goa.MethodKey, "TaskResult") + ctx = context.WithValue(ctx, goa.ServiceKey, "task") + payload, err := decodeRequest(r) + if err != nil { + if err := encodeError(ctx, w, err); err != nil { + errhandler(ctx, w, err) + } + return + } + res, err := endpoint(ctx, payload) + if err != nil { + if err := encodeError(ctx, w, err); err != nil { + errhandler(ctx, w, err) + } + return + } + if err := encodeResponse(ctx, w, res); err != nil { + errhandler(ctx, w, err) + } + }) +} diff --git a/gen/http/task/server/types.go b/gen/http/task/server/types.go index de98cf1a081580e66992b3dd65a5c5e6295d7a99..c1482c9a8bc09f8781fd87e399293afa5c4392c6 100644 --- a/gen/http/task/server/types.go +++ b/gen/http/task/server/types.go @@ -39,3 +39,11 @@ func NewCreateRequest(body interface{}, taskName string, cacheNamespace *string, return res } + +// NewTaskResultRequest builds a task service TaskResult endpoint payload. +func NewTaskResultRequest(taskID string) *task.TaskResultRequest { + v := &task.TaskResultRequest{} + v.TaskID = taskID + + return v +} diff --git a/gen/task/client.go b/gen/task/client.go index fb003d3f4bb7fbb112d0844cbf9ec1a3a80e87cd..3f812485b1c4b9ff706e6278e0bb385f7f4da636 100644 --- a/gen/task/client.go +++ b/gen/task/client.go @@ -15,13 +15,15 @@ import ( // Client is the "task" service client. type Client struct { - CreateEndpoint goa.Endpoint + CreateEndpoint goa.Endpoint + TaskResultEndpoint goa.Endpoint } // NewClient initializes a "task" service client given the endpoints. -func NewClient(create goa.Endpoint) *Client { +func NewClient(create, taskResult goa.Endpoint) *Client { return &Client{ - CreateEndpoint: create, + CreateEndpoint: create, + TaskResultEndpoint: taskResult, } } @@ -34,3 +36,13 @@ func (c *Client) Create(ctx context.Context, p *CreateRequest) (res *CreateResul } return ires.(*CreateResult), nil } + +// TaskResult calls the "TaskResult" endpoint of the "task" service. +func (c *Client) TaskResult(ctx context.Context, p *TaskResultRequest) (res interface{}, err error) { + var ires interface{} + ires, err = c.TaskResultEndpoint(ctx, p) + if err != nil { + return + } + return ires.(interface{}), nil +} diff --git a/gen/task/endpoints.go b/gen/task/endpoints.go index c5d61e36c2451113c04536a2b125e790c55c7c83..b648dd83cb34f626e54dbfba734439cbfbe813ca 100644 --- a/gen/task/endpoints.go +++ b/gen/task/endpoints.go @@ -15,19 +15,22 @@ import ( // Endpoints wraps the "task" service endpoints. type Endpoints struct { - Create goa.Endpoint + Create goa.Endpoint + TaskResult goa.Endpoint } // NewEndpoints wraps the methods of the "task" service with endpoints. func NewEndpoints(s Service) *Endpoints { return &Endpoints{ - Create: NewCreateEndpoint(s), + Create: NewCreateEndpoint(s), + TaskResult: NewTaskResultEndpoint(s), } } // Use applies the given middleware to all the "task" service endpoints. func (e *Endpoints) Use(m func(goa.Endpoint) goa.Endpoint) { e.Create = m(e.Create) + e.TaskResult = m(e.TaskResult) } // NewCreateEndpoint returns an endpoint function that calls the method @@ -38,3 +41,12 @@ func NewCreateEndpoint(s Service) goa.Endpoint { return s.Create(ctx, p) } } + +// NewTaskResultEndpoint returns an endpoint function that calls the method +// "TaskResult" of service "task". +func NewTaskResultEndpoint(s Service) goa.Endpoint { + return func(ctx context.Context, req interface{}) (interface{}, error) { + p := req.(*TaskResultRequest) + return s.TaskResult(ctx, p) + } +} diff --git a/gen/task/service.go b/gen/task/service.go index 13a5d28ac0ffbde98d80d9c898cdb789b16c2faa..f5c4b6954680960745e92c7556d1f243413b7647 100644 --- a/gen/task/service.go +++ b/gen/task/service.go @@ -15,6 +15,8 @@ import ( type Service interface { // Create a task and put it in a queue for execution. Create(context.Context, *CreateRequest) (res *CreateResult, err error) + // TaskResult retrieves task result from the Cache service. + TaskResult(context.Context, *TaskResultRequest) (res interface{}, err error) } // ServiceName is the name of the service as defined in the design. This is the @@ -25,7 +27,7 @@ const ServiceName = "task" // MethodNames lists the service method names as defined in the design. These // are the same values that are set in the endpoint request contexts under the // MethodKey key. -var MethodNames = [1]string{"Create"} +var MethodNames = [2]string{"Create", "TaskResult"} // CreateRequest is the payload type of the task service Create method. type CreateRequest struct { @@ -44,3 +46,9 @@ type CreateResult struct { // Unique task identifier. TaskID string } + +// TaskResultRequest is the payload type of the task service TaskResult method. +type TaskResultRequest struct { + // Unique task identifier. + TaskID string +} diff --git a/go.mod b/go.mod index 208c120b1a568cbaf999a06fea637b2987e83bd5..165c96aecfe53e413a79809a61f38a94ac8b4f0c 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/cenkalti/backoff/v4 v4.1.3 github.com/google/uuid v1.3.0 github.com/kelseyhightower/envconfig v1.4.0 + github.com/stretchr/testify v1.7.0 go.mongodb.org/mongo-driver v1.8.4 go.uber.org/zap v1.21.0 goa.design/goa/v3 v3.7.0 @@ -14,6 +15,7 @@ require ( ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dimfeld/httppath v0.0.0-20170720192232-ee938bf73598 // indirect github.com/dimfeld/httptreemux/v5 v5.4.0 // indirect github.com/go-stack/stack v1.8.0 // indirect @@ -25,6 +27,7 @@ require ( github.com/klauspost/compress v1.13.6 // indirect github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/sergi/go-diff v1.2.0 // indirect github.com/smartystreets/assertions v1.2.1 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect @@ -41,4 +44,5 @@ require ( golang.org/x/tools v0.1.10 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/internal/clients/cache/client.go b/internal/clients/cache/client.go index dc9033bd88e9d35fa5a4663d213dac66a81a491e..7ad51dceaab26442febc5d8afffb29eba1812727 100644 --- a/internal/clients/cache/client.go +++ b/internal/clients/cache/client.go @@ -4,7 +4,10 @@ import ( "bytes" "context" "fmt" + "io" "net/http" + + "code.vereign.com/gaiax/tsa/golib/errors" ) // Client for the Cache service. @@ -27,7 +30,7 @@ func New(addr string, opts ...Option) *Client { } func (c *Client) Set(ctx context.Context, key, namespace, scope string, value []byte) error { - req, err := http.NewRequest("POST", c.addr+"/v1/cache", bytes.NewReader(value)) + req, err := http.NewRequestWithContext(ctx, "POST", c.addr+"/v1/cache", bytes.NewReader(value)) if err != nil { return err } @@ -38,19 +41,44 @@ func (c *Client) Set(ctx context.Context, key, namespace, scope string, value [] "x-cache-scope": []string{scope}, } - resp, err := c.httpClient.Do(req.WithContext(ctx)) + resp, err := c.httpClient.Do(req) if err != nil { return err } defer resp.Body.Close() // nolint:errcheck if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected response code: %d", resp.StatusCode) + msg := fmt.Sprintf("unexpected response: %d %s", resp.StatusCode, resp.Status) + return errors.New(errors.GetKind(resp.StatusCode), msg) } return nil } func (c *Client) Get(ctx context.Context, key, namespace, scope string) ([]byte, error) { - return nil, fmt.Errorf("not implemented") + req, err := http.NewRequestWithContext(ctx, "GET", c.addr+"/v1/cache", nil) + req.Header = http.Header{ + "x-cache-key": []string{key}, + "x-cache-namespace": []string{namespace}, + "x-cache-scope": []string{scope}, + } + if err != nil { + return nil, err + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() // nolint:errcheck + + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { + return nil, errors.New(errors.NotFound) + } + msg := fmt.Sprintf("unexpected response: %d %s", resp.StatusCode, resp.Status) + return nil, errors.New(errors.GetKind(resp.StatusCode), msg) + } + + return io.ReadAll(resp.Body) } diff --git a/internal/executor/worker.go b/internal/executor/worker.go index feb9d3f81eddbd82698bd4d14ca373a6572cc1e5..aea032d32ef535db6a96d4b28a94335cd9379f1c 100644 --- a/internal/executor/worker.go +++ b/internal/executor/worker.go @@ -107,6 +107,7 @@ func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task if err != nil { return nil, errors.New("error evaluating request policy", err) } + task.ResponseCode = http.StatusOK } else if task.URL != "" && task.Method != "" { var status int status, response, err = w.doHTTPTask(ctx, task) diff --git a/internal/service/task/service.go b/internal/service/task/service.go index 78b713dc31d6accfbeda69bd74f5cf406e2c82ab..a4ea25ad30982f806f5b9f1431107321c9601b9d 100644 --- a/internal/service/task/service.go +++ b/internal/service/task/service.go @@ -1,6 +1,7 @@ package task import ( + "bytes" "context" "encoding/json" "time" @@ -12,9 +13,15 @@ import ( goatask "code.vereign.com/gaiax/tsa/task/gen/task" ) +//go:generate counterfeiter . Storage +//go:generate counterfeiter . Queue +//go:generate counterfeiter . Cache + // Storage for retrieving predefined task templates. type Storage interface { TaskTemplate(ctx context.Context, taskName string) (*Task, error) + Task(ctx context.Context, taskID string) (*Task, error) + TaskHistory(ctx context.Context, taskID string) (*Task, error) } // Queue interface for retrieving, returning and removing tasks from Queue. @@ -25,23 +32,33 @@ type Queue interface { Unack(ctx context.Context, task *Task) error } +type Cache interface { + Get(ctx context.Context, key, namespace, scope string) ([]byte, error) +} + type Service struct { storage Storage queue Queue + cache Cache logger *zap.Logger } // New creates the task service. -func New(template Storage, queue Queue, logger *zap.Logger) *Service { +func New(template Storage, queue Queue, cache Cache, logger *zap.Logger) *Service { return &Service{ storage: template, queue: queue, + cache: cache, logger: logger, } } // Create a new task and put it in a Queue for later execution. func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *goatask.CreateResult, err error) { + if req.TaskName == "" { + return nil, errors.New(errors.BadRequest, "missing taskName") + } + logger := s.logger.With(zap.String("taskName", req.TaskName)) // get predefined task definition from storage @@ -77,3 +94,48 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res * return &goatask.CreateResult{TaskID: task.ID}, nil } + +// TaskResult retrieves task result from the Cache service. +func (s *Service) TaskResult(ctx context.Context, req *goatask.TaskResultRequest) (res interface{}, err error) { + if req.TaskID == "" { + return nil, errors.New(errors.BadRequest, "missing taskID") + } + + logger := s.logger.With(zap.String("taskID", req.TaskID)) + + var task *Task + task, err = s.storage.TaskHistory(ctx, req.TaskID) + if err != nil && !errors.Is(errors.NotFound, err) { + logger.Error("error getting task from history collection", zap.Error(err)) + return nil, err + } + + if task == nil { + task, err = s.storage.Task(ctx, req.TaskID) + if err != nil { + if errors.Is(errors.NotFound, err) { + return nil, errors.New("task is not found", err) + } + logger.Error("error getting task from history collection", zap.Error(err)) + return nil, err + } + } + + if task.State != Done { + return nil, errors.New(errors.NotFound, "no result, task is not completed") + } + + value, err := s.cache.Get(ctx, task.ID, task.CacheNamespace, task.CacheScope) + if err != nil { + logger.Error("error getting task result from cache", zap.Error(err)) + return nil, err + } + + var result interface{} + if err := json.NewDecoder(bytes.NewReader(value)).Decode(&result); err != nil { + logger.Error("error decoding result from cache", zap.Error(err)) + return nil, errors.New("error decoding result from cache", err) + } + + return result, nil +} diff --git a/internal/service/task/service_test.go b/internal/service/task/service_test.go new file mode 100644 index 0000000000000000000000000000000000000000..70051423452d36b66e184e8d3ea6f28fb504db3b --- /dev/null +++ b/internal/service/task/service_test.go @@ -0,0 +1,266 @@ +package task_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + "code.vereign.com/gaiax/tsa/golib/ptr" + goatask "code.vereign.com/gaiax/tsa/task/gen/task" + "code.vereign.com/gaiax/tsa/task/internal/service/task" + "code.vereign.com/gaiax/tsa/task/internal/service/task/taskfakes" +) + +func TestNew(t *testing.T) { + svc := task.New(nil, nil, nil, zap.NewNop()) + assert.Implements(t, (*goatask.Service)(nil), svc) +} + +func TestService_Create(t *testing.T) { + tests := []struct { + name string + req *goatask.CreateRequest + storage *taskfakes.FakeStorage + queue *taskfakes.FakeQueue + cache *taskfakes.FakeCache + + errkind errors.Kind + errtext string + }{ + { + name: "empty task name", + req: &goatask.CreateRequest{}, + errkind: errors.BadRequest, + errtext: "missing taskName", + }, + { + name: "task template not found", + req: &goatask.CreateRequest{TaskName: "taskname"}, + storage: &taskfakes.FakeStorage{ + TaskTemplateStub: func(ctx context.Context, taskName string) (*task.Task, error) { + return nil, errors.New(errors.NotFound) + }, + }, + errkind: errors.NotFound, + errtext: "not found", + }, + { + name: "fail to add task to queue", + req: &goatask.CreateRequest{TaskName: "taskname"}, + storage: &taskfakes.FakeStorage{ + TaskTemplateStub: func(ctx context.Context, taskName string) (*task.Task, error) { + return &task.Task{}, nil + }, + }, + queue: &taskfakes.FakeQueue{ + AddStub: func(ctx context.Context, t *task.Task) error { + return errors.New("some error") + }, + }, + errkind: errors.Unknown, + errtext: "some error", + }, + { + name: "successfully add task to queue", + req: &goatask.CreateRequest{TaskName: "taskname"}, + storage: &taskfakes.FakeStorage{ + TaskTemplateStub: func(ctx context.Context, taskName string) (*task.Task, error) { + return &task.Task{}, nil + }, + }, + queue: &taskfakes.FakeQueue{ + AddStub: func(ctx context.Context, t *task.Task) error { + return nil + }, + }, + }, + { + name: "successfully add task to queue with namespace and scope", + req: &goatask.CreateRequest{ + TaskName: "taskname", + CacheNamespace: ptr.String("login"), + CacheScope: ptr.String("user"), + }, + storage: &taskfakes.FakeStorage{ + TaskTemplateStub: func(ctx context.Context, taskName string) (*task.Task, error) { + return &task.Task{}, nil + }, + }, + queue: &taskfakes.FakeQueue{ + AddStub: func(ctx context.Context, t *task.Task) error { + return nil + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + svc := task.New(test.storage, test.queue, test.cache, zap.NewNop()) + res, err := svc.Create(context.Background(), test.req) + if err != nil { + assert.NotEmpty(t, test.errtext) + e, ok := err.(*errors.Error) + assert.True(t, ok) + assert.Equal(t, test.errkind, e.Kind) + assert.Contains(t, e.Error(), test.errtext) + assert.Nil(t, res) + } else { + assert.Empty(t, test.errtext) + assert.NotNil(t, res) + assert.NotEmpty(t, res.TaskID) + } + }) + } +} + +func TestService_TaskResult(t *testing.T) { + tests := []struct { + name string + req *goatask.TaskResultRequest + storage *taskfakes.FakeStorage + cache *taskfakes.FakeCache + + res interface{} + errkind errors.Kind + errtext string + }{ + { + name: "missing taskID", + req: &goatask.TaskResultRequest{}, + errkind: errors.BadRequest, + errtext: "missing taskID", + }, + { + name: "error getting task history from storage", + req: &goatask.TaskResultRequest{TaskID: "123"}, + storage: &taskfakes.FakeStorage{ + TaskHistoryStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return nil, errors.New("some error") + }, + }, + errkind: errors.Unknown, + errtext: "some error", + }, + { + name: "task not found in history and fail to retrieve it from tasks queue collection too", + req: &goatask.TaskResultRequest{TaskID: "123"}, + storage: &taskfakes.FakeStorage{ + TaskHistoryStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return nil, errors.New(errors.NotFound) + }, + TaskStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return nil, errors.New("another error") + }, + }, + errkind: errors.Unknown, + errtext: "another error", + }, + { + name: "task not found neither in history nor in tasks queue collection", + req: &goatask.TaskResultRequest{TaskID: "123"}, + storage: &taskfakes.FakeStorage{ + TaskHistoryStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return nil, errors.New(errors.NotFound) + }, + TaskStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return nil, errors.New(errors.NotFound) + }, + }, + errkind: errors.NotFound, + errtext: "task is not found", + }, + { + name: "task is not yet completed", + req: &goatask.TaskResultRequest{TaskID: "123"}, + storage: &taskfakes.FakeStorage{ + TaskHistoryStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return nil, errors.New(errors.NotFound) + }, + TaskStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return &task.Task{State: task.Pending}, nil + }, + }, + errkind: errors.NotFound, + errtext: "no result, task is not completed", + }, + { + name: "error getting task result from cache", + req: &goatask.TaskResultRequest{TaskID: "123"}, + storage: &taskfakes.FakeStorage{ + TaskHistoryStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return nil, errors.New(errors.NotFound) + }, + TaskStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return &task.Task{State: task.Done}, nil + }, + }, + cache: &taskfakes.FakeCache{ + GetStub: func(ctx context.Context, key string, ns string, scope string) ([]byte, error) { + return nil, errors.New("cache error") + }, + }, + errkind: errors.Unknown, + errtext: "cache error", + }, + { + name: "getting invalid JSON result from cache", + req: &goatask.TaskResultRequest{TaskID: "123"}, + storage: &taskfakes.FakeStorage{ + TaskHistoryStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return nil, errors.New(errors.NotFound) + }, + TaskStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return &task.Task{State: task.Done}, nil + }, + }, + cache: &taskfakes.FakeCache{ + GetStub: func(ctx context.Context, key string, ns string, scope string) ([]byte, error) { + return []byte("asdfads"), nil + }, + }, + errkind: errors.Unknown, + errtext: "error decoding result from cache", + }, + { + name: "get task result successfully", + req: &goatask.TaskResultRequest{TaskID: "123"}, + storage: &taskfakes.FakeStorage{ + TaskHistoryStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return nil, errors.New(errors.NotFound) + }, + TaskStub: func(ctx context.Context, taskID string) (*task.Task, error) { + return &task.Task{State: task.Done}, nil + }, + }, + cache: &taskfakes.FakeCache{ + GetStub: func(ctx context.Context, key string, ns string, scope string) ([]byte, error) { + return []byte(`{"result":"success"}`), nil + }, + }, + res: map[string]interface{}{"result": "success"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + svc := task.New(test.storage, nil, test.cache, zap.NewNop()) + res, err := svc.TaskResult(context.Background(), test.req) + if err != nil { + assert.NotEmpty(t, test.errtext) + e, ok := err.(*errors.Error) + assert.True(t, ok) + assert.Equal(t, test.errkind, e.Kind) + assert.Contains(t, e.Error(), test.errtext) + assert.Nil(t, res) + } else { + assert.Empty(t, test.errtext) + assert.NotNil(t, res) + assert.Equal(t, test.res, res) + } + }) + } +} diff --git a/internal/service/task/taskfakes/fake_cache.go b/internal/service/task/taskfakes/fake_cache.go new file mode 100644 index 0000000000000000000000000000000000000000..d08e54eb65473018bf8796dcc0d7a1e1fe4a9f13 --- /dev/null +++ b/internal/service/task/taskfakes/fake_cache.go @@ -0,0 +1,123 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package taskfakes + +import ( + "context" + "sync" + + "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +type FakeCache struct { + GetStub func(context.Context, string, string, string) ([]byte, error) + getMutex sync.RWMutex + getArgsForCall []struct { + arg1 context.Context + arg2 string + arg3 string + arg4 string + } + getReturns struct { + result1 []byte + result2 error + } + getReturnsOnCall map[int]struct { + result1 []byte + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeCache) Get(arg1 context.Context, arg2 string, arg3 string, arg4 string) ([]byte, error) { + fake.getMutex.Lock() + ret, specificReturn := fake.getReturnsOnCall[len(fake.getArgsForCall)] + fake.getArgsForCall = append(fake.getArgsForCall, struct { + arg1 context.Context + arg2 string + arg3 string + arg4 string + }{arg1, arg2, arg3, arg4}) + stub := fake.GetStub + fakeReturns := fake.getReturns + fake.recordInvocation("Get", []interface{}{arg1, arg2, arg3, arg4}) + fake.getMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeCache) GetCallCount() int { + fake.getMutex.RLock() + defer fake.getMutex.RUnlock() + return len(fake.getArgsForCall) +} + +func (fake *FakeCache) GetCalls(stub func(context.Context, string, string, string) ([]byte, error)) { + fake.getMutex.Lock() + defer fake.getMutex.Unlock() + fake.GetStub = stub +} + +func (fake *FakeCache) GetArgsForCall(i int) (context.Context, string, string, string) { + fake.getMutex.RLock() + defer fake.getMutex.RUnlock() + argsForCall := fake.getArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + +func (fake *FakeCache) GetReturns(result1 []byte, result2 error) { + fake.getMutex.Lock() + defer fake.getMutex.Unlock() + fake.GetStub = nil + fake.getReturns = struct { + result1 []byte + result2 error + }{result1, result2} +} + +func (fake *FakeCache) GetReturnsOnCall(i int, result1 []byte, result2 error) { + fake.getMutex.Lock() + defer fake.getMutex.Unlock() + fake.GetStub = nil + if fake.getReturnsOnCall == nil { + fake.getReturnsOnCall = make(map[int]struct { + result1 []byte + result2 error + }) + } + fake.getReturnsOnCall[i] = struct { + result1 []byte + result2 error + }{result1, result2} +} + +func (fake *FakeCache) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.getMutex.RLock() + defer fake.getMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeCache) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ task.Cache = new(FakeCache) diff --git a/internal/service/task/taskfakes/fake_queue.go b/internal/service/task/taskfakes/fake_queue.go new file mode 100644 index 0000000000000000000000000000000000000000..3366db2111a32454ea0fe27cbb172abc6375400c --- /dev/null +++ b/internal/service/task/taskfakes/fake_queue.go @@ -0,0 +1,345 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package taskfakes + +import ( + "context" + "sync" + + "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +type FakeQueue struct { + AckStub func(context.Context, *task.Task) error + ackMutex sync.RWMutex + ackArgsForCall []struct { + arg1 context.Context + arg2 *task.Task + } + ackReturns struct { + result1 error + } + ackReturnsOnCall map[int]struct { + result1 error + } + AddStub func(context.Context, *task.Task) error + addMutex sync.RWMutex + addArgsForCall []struct { + arg1 context.Context + arg2 *task.Task + } + addReturns struct { + result1 error + } + addReturnsOnCall map[int]struct { + result1 error + } + PollStub func(context.Context) (*task.Task, error) + pollMutex sync.RWMutex + pollArgsForCall []struct { + arg1 context.Context + } + pollReturns struct { + result1 *task.Task + result2 error + } + pollReturnsOnCall map[int]struct { + result1 *task.Task + result2 error + } + UnackStub func(context.Context, *task.Task) error + unackMutex sync.RWMutex + unackArgsForCall []struct { + arg1 context.Context + arg2 *task.Task + } + unackReturns struct { + result1 error + } + unackReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeQueue) Ack(arg1 context.Context, arg2 *task.Task) error { + fake.ackMutex.Lock() + ret, specificReturn := fake.ackReturnsOnCall[len(fake.ackArgsForCall)] + fake.ackArgsForCall = append(fake.ackArgsForCall, struct { + arg1 context.Context + arg2 *task.Task + }{arg1, arg2}) + stub := fake.AckStub + fakeReturns := fake.ackReturns + fake.recordInvocation("Ack", []interface{}{arg1, arg2}) + fake.ackMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeQueue) AckCallCount() int { + fake.ackMutex.RLock() + defer fake.ackMutex.RUnlock() + return len(fake.ackArgsForCall) +} + +func (fake *FakeQueue) AckCalls(stub func(context.Context, *task.Task) error) { + fake.ackMutex.Lock() + defer fake.ackMutex.Unlock() + fake.AckStub = stub +} + +func (fake *FakeQueue) AckArgsForCall(i int) (context.Context, *task.Task) { + fake.ackMutex.RLock() + defer fake.ackMutex.RUnlock() + argsForCall := fake.ackArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeQueue) AckReturns(result1 error) { + fake.ackMutex.Lock() + defer fake.ackMutex.Unlock() + fake.AckStub = nil + fake.ackReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeQueue) AckReturnsOnCall(i int, result1 error) { + fake.ackMutex.Lock() + defer fake.ackMutex.Unlock() + fake.AckStub = nil + if fake.ackReturnsOnCall == nil { + fake.ackReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.ackReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeQueue) Add(arg1 context.Context, arg2 *task.Task) error { + fake.addMutex.Lock() + ret, specificReturn := fake.addReturnsOnCall[len(fake.addArgsForCall)] + fake.addArgsForCall = append(fake.addArgsForCall, struct { + arg1 context.Context + arg2 *task.Task + }{arg1, arg2}) + stub := fake.AddStub + fakeReturns := fake.addReturns + fake.recordInvocation("Add", []interface{}{arg1, arg2}) + fake.addMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeQueue) AddCallCount() int { + fake.addMutex.RLock() + defer fake.addMutex.RUnlock() + return len(fake.addArgsForCall) +} + +func (fake *FakeQueue) AddCalls(stub func(context.Context, *task.Task) error) { + fake.addMutex.Lock() + defer fake.addMutex.Unlock() + fake.AddStub = stub +} + +func (fake *FakeQueue) AddArgsForCall(i int) (context.Context, *task.Task) { + fake.addMutex.RLock() + defer fake.addMutex.RUnlock() + argsForCall := fake.addArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeQueue) AddReturns(result1 error) { + fake.addMutex.Lock() + defer fake.addMutex.Unlock() + fake.AddStub = nil + fake.addReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeQueue) AddReturnsOnCall(i int, result1 error) { + fake.addMutex.Lock() + defer fake.addMutex.Unlock() + fake.AddStub = nil + if fake.addReturnsOnCall == nil { + fake.addReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.addReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeQueue) Poll(arg1 context.Context) (*task.Task, error) { + fake.pollMutex.Lock() + ret, specificReturn := fake.pollReturnsOnCall[len(fake.pollArgsForCall)] + fake.pollArgsForCall = append(fake.pollArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.PollStub + fakeReturns := fake.pollReturns + fake.recordInvocation("Poll", []interface{}{arg1}) + fake.pollMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeQueue) PollCallCount() int { + fake.pollMutex.RLock() + defer fake.pollMutex.RUnlock() + return len(fake.pollArgsForCall) +} + +func (fake *FakeQueue) PollCalls(stub func(context.Context) (*task.Task, error)) { + fake.pollMutex.Lock() + defer fake.pollMutex.Unlock() + fake.PollStub = stub +} + +func (fake *FakeQueue) PollArgsForCall(i int) context.Context { + fake.pollMutex.RLock() + defer fake.pollMutex.RUnlock() + argsForCall := fake.pollArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeQueue) PollReturns(result1 *task.Task, result2 error) { + fake.pollMutex.Lock() + defer fake.pollMutex.Unlock() + fake.PollStub = nil + fake.pollReturns = struct { + result1 *task.Task + result2 error + }{result1, result2} +} + +func (fake *FakeQueue) PollReturnsOnCall(i int, result1 *task.Task, result2 error) { + fake.pollMutex.Lock() + defer fake.pollMutex.Unlock() + fake.PollStub = nil + if fake.pollReturnsOnCall == nil { + fake.pollReturnsOnCall = make(map[int]struct { + result1 *task.Task + result2 error + }) + } + fake.pollReturnsOnCall[i] = struct { + result1 *task.Task + result2 error + }{result1, result2} +} + +func (fake *FakeQueue) Unack(arg1 context.Context, arg2 *task.Task) error { + fake.unackMutex.Lock() + ret, specificReturn := fake.unackReturnsOnCall[len(fake.unackArgsForCall)] + fake.unackArgsForCall = append(fake.unackArgsForCall, struct { + arg1 context.Context + arg2 *task.Task + }{arg1, arg2}) + stub := fake.UnackStub + fakeReturns := fake.unackReturns + fake.recordInvocation("Unack", []interface{}{arg1, arg2}) + fake.unackMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeQueue) UnackCallCount() int { + fake.unackMutex.RLock() + defer fake.unackMutex.RUnlock() + return len(fake.unackArgsForCall) +} + +func (fake *FakeQueue) UnackCalls(stub func(context.Context, *task.Task) error) { + fake.unackMutex.Lock() + defer fake.unackMutex.Unlock() + fake.UnackStub = stub +} + +func (fake *FakeQueue) UnackArgsForCall(i int) (context.Context, *task.Task) { + fake.unackMutex.RLock() + defer fake.unackMutex.RUnlock() + argsForCall := fake.unackArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeQueue) UnackReturns(result1 error) { + fake.unackMutex.Lock() + defer fake.unackMutex.Unlock() + fake.UnackStub = nil + fake.unackReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeQueue) UnackReturnsOnCall(i int, result1 error) { + fake.unackMutex.Lock() + defer fake.unackMutex.Unlock() + fake.UnackStub = nil + if fake.unackReturnsOnCall == nil { + fake.unackReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.unackReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeQueue) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.ackMutex.RLock() + defer fake.ackMutex.RUnlock() + fake.addMutex.RLock() + defer fake.addMutex.RUnlock() + fake.pollMutex.RLock() + defer fake.pollMutex.RUnlock() + fake.unackMutex.RLock() + defer fake.unackMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeQueue) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ task.Queue = new(FakeQueue) diff --git a/internal/service/task/taskfakes/fake_storage.go b/internal/service/task/taskfakes/fake_storage.go new file mode 100644 index 0000000000000000000000000000000000000000..1570d8bfa71bacb2cee9e434c037fdab3bda993f --- /dev/null +++ b/internal/service/task/taskfakes/fake_storage.go @@ -0,0 +1,281 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package taskfakes + +import ( + "context" + "sync" + + "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +type FakeStorage struct { + TaskStub func(context.Context, string) (*task.Task, error) + taskMutex sync.RWMutex + taskArgsForCall []struct { + arg1 context.Context + arg2 string + } + taskReturns struct { + result1 *task.Task + result2 error + } + taskReturnsOnCall map[int]struct { + result1 *task.Task + result2 error + } + TaskHistoryStub func(context.Context, string) (*task.Task, error) + taskHistoryMutex sync.RWMutex + taskHistoryArgsForCall []struct { + arg1 context.Context + arg2 string + } + taskHistoryReturns struct { + result1 *task.Task + result2 error + } + taskHistoryReturnsOnCall map[int]struct { + result1 *task.Task + result2 error + } + TaskTemplateStub func(context.Context, string) (*task.Task, error) + taskTemplateMutex sync.RWMutex + taskTemplateArgsForCall []struct { + arg1 context.Context + arg2 string + } + taskTemplateReturns struct { + result1 *task.Task + result2 error + } + taskTemplateReturnsOnCall map[int]struct { + result1 *task.Task + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeStorage) Task(arg1 context.Context, arg2 string) (*task.Task, error) { + fake.taskMutex.Lock() + ret, specificReturn := fake.taskReturnsOnCall[len(fake.taskArgsForCall)] + fake.taskArgsForCall = append(fake.taskArgsForCall, struct { + arg1 context.Context + arg2 string + }{arg1, arg2}) + stub := fake.TaskStub + fakeReturns := fake.taskReturns + fake.recordInvocation("Task", []interface{}{arg1, arg2}) + fake.taskMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeStorage) TaskCallCount() int { + fake.taskMutex.RLock() + defer fake.taskMutex.RUnlock() + return len(fake.taskArgsForCall) +} + +func (fake *FakeStorage) TaskCalls(stub func(context.Context, string) (*task.Task, error)) { + fake.taskMutex.Lock() + defer fake.taskMutex.Unlock() + fake.TaskStub = stub +} + +func (fake *FakeStorage) TaskArgsForCall(i int) (context.Context, string) { + fake.taskMutex.RLock() + defer fake.taskMutex.RUnlock() + argsForCall := fake.taskArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeStorage) TaskReturns(result1 *task.Task, result2 error) { + fake.taskMutex.Lock() + defer fake.taskMutex.Unlock() + fake.TaskStub = nil + fake.taskReturns = struct { + result1 *task.Task + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) TaskReturnsOnCall(i int, result1 *task.Task, result2 error) { + fake.taskMutex.Lock() + defer fake.taskMutex.Unlock() + fake.TaskStub = nil + if fake.taskReturnsOnCall == nil { + fake.taskReturnsOnCall = make(map[int]struct { + result1 *task.Task + result2 error + }) + } + fake.taskReturnsOnCall[i] = struct { + result1 *task.Task + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) TaskHistory(arg1 context.Context, arg2 string) (*task.Task, error) { + fake.taskHistoryMutex.Lock() + ret, specificReturn := fake.taskHistoryReturnsOnCall[len(fake.taskHistoryArgsForCall)] + fake.taskHistoryArgsForCall = append(fake.taskHistoryArgsForCall, struct { + arg1 context.Context + arg2 string + }{arg1, arg2}) + stub := fake.TaskHistoryStub + fakeReturns := fake.taskHistoryReturns + fake.recordInvocation("TaskHistory", []interface{}{arg1, arg2}) + fake.taskHistoryMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeStorage) TaskHistoryCallCount() int { + fake.taskHistoryMutex.RLock() + defer fake.taskHistoryMutex.RUnlock() + return len(fake.taskHistoryArgsForCall) +} + +func (fake *FakeStorage) TaskHistoryCalls(stub func(context.Context, string) (*task.Task, error)) { + fake.taskHistoryMutex.Lock() + defer fake.taskHistoryMutex.Unlock() + fake.TaskHistoryStub = stub +} + +func (fake *FakeStorage) TaskHistoryArgsForCall(i int) (context.Context, string) { + fake.taskHistoryMutex.RLock() + defer fake.taskHistoryMutex.RUnlock() + argsForCall := fake.taskHistoryArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeStorage) TaskHistoryReturns(result1 *task.Task, result2 error) { + fake.taskHistoryMutex.Lock() + defer fake.taskHistoryMutex.Unlock() + fake.TaskHistoryStub = nil + fake.taskHistoryReturns = struct { + result1 *task.Task + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) TaskHistoryReturnsOnCall(i int, result1 *task.Task, result2 error) { + fake.taskHistoryMutex.Lock() + defer fake.taskHistoryMutex.Unlock() + fake.TaskHistoryStub = nil + if fake.taskHistoryReturnsOnCall == nil { + fake.taskHistoryReturnsOnCall = make(map[int]struct { + result1 *task.Task + result2 error + }) + } + fake.taskHistoryReturnsOnCall[i] = struct { + result1 *task.Task + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) TaskTemplate(arg1 context.Context, arg2 string) (*task.Task, error) { + fake.taskTemplateMutex.Lock() + ret, specificReturn := fake.taskTemplateReturnsOnCall[len(fake.taskTemplateArgsForCall)] + fake.taskTemplateArgsForCall = append(fake.taskTemplateArgsForCall, struct { + arg1 context.Context + arg2 string + }{arg1, arg2}) + stub := fake.TaskTemplateStub + fakeReturns := fake.taskTemplateReturns + fake.recordInvocation("TaskTemplate", []interface{}{arg1, arg2}) + fake.taskTemplateMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeStorage) TaskTemplateCallCount() int { + fake.taskTemplateMutex.RLock() + defer fake.taskTemplateMutex.RUnlock() + return len(fake.taskTemplateArgsForCall) +} + +func (fake *FakeStorage) TaskTemplateCalls(stub func(context.Context, string) (*task.Task, error)) { + fake.taskTemplateMutex.Lock() + defer fake.taskTemplateMutex.Unlock() + fake.TaskTemplateStub = stub +} + +func (fake *FakeStorage) TaskTemplateArgsForCall(i int) (context.Context, string) { + fake.taskTemplateMutex.RLock() + defer fake.taskTemplateMutex.RUnlock() + argsForCall := fake.taskTemplateArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeStorage) TaskTemplateReturns(result1 *task.Task, result2 error) { + fake.taskTemplateMutex.Lock() + defer fake.taskTemplateMutex.Unlock() + fake.TaskTemplateStub = nil + fake.taskTemplateReturns = struct { + result1 *task.Task + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) TaskTemplateReturnsOnCall(i int, result1 *task.Task, result2 error) { + fake.taskTemplateMutex.Lock() + defer fake.taskTemplateMutex.Unlock() + fake.TaskTemplateStub = nil + if fake.taskTemplateReturnsOnCall == nil { + fake.taskTemplateReturnsOnCall = make(map[int]struct { + result1 *task.Task + result2 error + }) + } + fake.taskTemplateReturnsOnCall[i] = struct { + result1 *task.Task + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.taskMutex.RLock() + defer fake.taskMutex.RUnlock() + fake.taskHistoryMutex.RLock() + defer fake.taskHistoryMutex.RUnlock() + fake.taskTemplateMutex.RLock() + defer fake.taskTemplateMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeStorage) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ task.Storage = new(FakeStorage) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 0e314d761b3cf3ce6f95a2ae959b180a6f7a6c54..f5b44125ce771b4bec90965e4637bd8e115906b9 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -118,3 +118,43 @@ func (s *Storage) SaveTaskHistory(ctx context.Context, task *task.Task) error { b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) return backoff.Retry(insert, b) } + +func (s *Storage) Task(ctx context.Context, taskID string) (*task.Task, error) { + result := s.tasks.FindOne(ctx, bson.M{ + "id": taskID, + }) + + if result.Err() != nil { + if strings.Contains(result.Err().Error(), "no documents in result") { + return nil, errors.New(errors.NotFound, "task not found") + } + return nil, result.Err() + } + + var task task.Task + if err := result.Decode(&task); err != nil { + return nil, err + } + + return &task, nil +} + +func (s *Storage) TaskHistory(ctx context.Context, taskID string) (*task.Task, error) { + result := s.tasksHistory.FindOne(ctx, bson.M{ + "id": taskID, + }) + + if result.Err() != nil { + if strings.Contains(result.Err().Error(), "no documents in result") { + return nil, errors.New(errors.NotFound, "task not found") + } + return nil, result.Err() + } + + var task task.Task + if err := result.Decode(&task); err != nil { + return nil, err + } + + return &task, nil +} diff --git a/vendor/code.vereign.com/gaiax/tsa/golib/ptr/ptr.go b/vendor/code.vereign.com/gaiax/tsa/golib/ptr/ptr.go new file mode 100644 index 0000000000000000000000000000000000000000..fd522b7a70d7449dccc01ee3b2b3181eb9e7600e Binary files /dev/null and b/vendor/code.vereign.com/gaiax/tsa/golib/ptr/ptr.go differ diff --git a/vendor/github.com/davecgh/go-spew/LICENSE b/vendor/github.com/davecgh/go-spew/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..bc52e96f2b0ea97cc450e2fefbbb4cc430d1ac5a Binary files /dev/null and b/vendor/github.com/davecgh/go-spew/LICENSE differ diff --git a/vendor/github.com/davecgh/go-spew/spew/bypass.go b/vendor/github.com/davecgh/go-spew/spew/bypass.go new file mode 100644 index 0000000000000000000000000000000000000000..792994785e36ca74c5545a0d93a2cdecda006678 Binary files /dev/null and b/vendor/github.com/davecgh/go-spew/spew/bypass.go differ diff --git a/vendor/github.com/davecgh/go-spew/spew/bypasssafe.go b/vendor/github.com/davecgh/go-spew/spew/bypasssafe.go new file mode 100644 index 0000000000000000000000000000000000000000..205c28d68c474e4497e6aa1ce8b9fdeb260f4586 Binary files /dev/null and b/vendor/github.com/davecgh/go-spew/spew/bypasssafe.go differ diff --git a/vendor/github.com/davecgh/go-spew/spew/common.go b/vendor/github.com/davecgh/go-spew/spew/common.go new file mode 100644 index 0000000000000000000000000000000000000000..1be8ce9457612e02a64c01b2321d087ebd6415f2 Binary files /dev/null and b/vendor/github.com/davecgh/go-spew/spew/common.go differ diff --git a/vendor/github.com/davecgh/go-spew/spew/config.go b/vendor/github.com/davecgh/go-spew/spew/config.go new file mode 100644 index 0000000000000000000000000000000000000000..2e3d22f312026ff2c863bbffcbc88b7f6fb942f5 Binary files /dev/null and b/vendor/github.com/davecgh/go-spew/spew/config.go differ diff --git a/vendor/github.com/davecgh/go-spew/spew/doc.go b/vendor/github.com/davecgh/go-spew/spew/doc.go new file mode 100644 index 0000000000000000000000000000000000000000..aacaac6f1e1e936ee0022c00e139756c9bdc2b3e Binary files /dev/null and b/vendor/github.com/davecgh/go-spew/spew/doc.go differ diff --git a/vendor/github.com/davecgh/go-spew/spew/dump.go b/vendor/github.com/davecgh/go-spew/spew/dump.go new file mode 100644 index 0000000000000000000000000000000000000000..f78d89fc1f6c454df58cd1e346817db6e30c4299 Binary files /dev/null and b/vendor/github.com/davecgh/go-spew/spew/dump.go differ diff --git a/vendor/github.com/davecgh/go-spew/spew/format.go b/vendor/github.com/davecgh/go-spew/spew/format.go new file mode 100644 index 0000000000000000000000000000000000000000..b04edb7d7ac278ae0b873a1335f37822a00bfd7c Binary files /dev/null and b/vendor/github.com/davecgh/go-spew/spew/format.go differ diff --git a/vendor/github.com/davecgh/go-spew/spew/spew.go b/vendor/github.com/davecgh/go-spew/spew/spew.go new file mode 100644 index 0000000000000000000000000000000000000000..32c0e338825308f6b9b4d0407aa5682a23e2dc9c Binary files /dev/null and b/vendor/github.com/davecgh/go-spew/spew/spew.go differ diff --git a/vendor/github.com/pmezard/go-difflib/LICENSE b/vendor/github.com/pmezard/go-difflib/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..c67dad612a3dfca2b84599c640798d7be7d46728 Binary files /dev/null and b/vendor/github.com/pmezard/go-difflib/LICENSE differ diff --git a/vendor/github.com/pmezard/go-difflib/difflib/difflib.go b/vendor/github.com/pmezard/go-difflib/difflib/difflib.go new file mode 100644 index 0000000000000000000000000000000000000000..003e99fadb4f189565b409b9509ecf30b752d25a Binary files /dev/null and b/vendor/github.com/pmezard/go-difflib/difflib/difflib.go differ diff --git a/vendor/github.com/stretchr/testify/LICENSE b/vendor/github.com/stretchr/testify/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..4b0421cf9ee47908beae4b4648babb75b09ee028 Binary files /dev/null and b/vendor/github.com/stretchr/testify/LICENSE differ diff --git a/vendor/github.com/stretchr/testify/assert/assertion_compare.go b/vendor/github.com/stretchr/testify/assert/assertion_compare.go new file mode 100644 index 0000000000000000000000000000000000000000..41649d26792461a0e999695e0c91a15d72b5898a Binary files /dev/null and b/vendor/github.com/stretchr/testify/assert/assertion_compare.go differ diff --git a/vendor/github.com/stretchr/testify/assert/assertion_format.go b/vendor/github.com/stretchr/testify/assert/assertion_format.go new file mode 100644 index 0000000000000000000000000000000000000000..4dfd1229a8617f401e11efa0ad461447f31c1b3e Binary files /dev/null and b/vendor/github.com/stretchr/testify/assert/assertion_format.go differ diff --git a/vendor/github.com/stretchr/testify/assert/assertion_format.go.tmpl b/vendor/github.com/stretchr/testify/assert/assertion_format.go.tmpl new file mode 100644 index 0000000000000000000000000000000000000000..d2bb0b81778858c364f4b3694c00cdd4c72b1c5b Binary files /dev/null and b/vendor/github.com/stretchr/testify/assert/assertion_format.go.tmpl differ diff --git a/vendor/github.com/stretchr/testify/assert/assertion_forward.go b/vendor/github.com/stretchr/testify/assert/assertion_forward.go new file mode 100644 index 0000000000000000000000000000000000000000..25337a6f07e6e05f3f29e5493cc2ba71cc474abb Binary files /dev/null and b/vendor/github.com/stretchr/testify/assert/assertion_forward.go differ diff --git a/vendor/github.com/stretchr/testify/assert/assertion_forward.go.tmpl b/vendor/github.com/stretchr/testify/assert/assertion_forward.go.tmpl new file mode 100644 index 0000000000000000000000000000000000000000..188bb9e174397295062da708cc9f5207e2331768 Binary files /dev/null and b/vendor/github.com/stretchr/testify/assert/assertion_forward.go.tmpl differ diff --git a/vendor/github.com/stretchr/testify/assert/assertion_order.go b/vendor/github.com/stretchr/testify/assert/assertion_order.go new file mode 100644 index 0000000000000000000000000000000000000000..1c3b47182a726afbfb1890c5119144bad1bcf8c9 Binary files /dev/null and b/vendor/github.com/stretchr/testify/assert/assertion_order.go differ diff --git a/vendor/github.com/stretchr/testify/assert/assertions.go b/vendor/github.com/stretchr/testify/assert/assertions.go new file mode 100644 index 0000000000000000000000000000000000000000..bcac4401f57fb271d4a0909e607d56d51c606e59 Binary files /dev/null and b/vendor/github.com/stretchr/testify/assert/assertions.go differ diff --git a/vendor/github.com/stretchr/testify/assert/doc.go b/vendor/github.com/stretchr/testify/assert/doc.go new file mode 100644 index 0000000000000000000000000000000000000000..c9dccc4d6cd0aad89a9ecf638d8cde1ea043a37a Binary files /dev/null and b/vendor/github.com/stretchr/testify/assert/doc.go differ diff --git a/vendor/github.com/stretchr/testify/assert/errors.go b/vendor/github.com/stretchr/testify/assert/errors.go new file mode 100644 index 0000000000000000000000000000000000000000..ac9dc9d1d6156b64c31ac0b130e7a2b1ca86f06d Binary files /dev/null and b/vendor/github.com/stretchr/testify/assert/errors.go differ diff --git a/vendor/github.com/stretchr/testify/assert/forward_assertions.go b/vendor/github.com/stretchr/testify/assert/forward_assertions.go new file mode 100644 index 0000000000000000000000000000000000000000..df189d2348f17a3d16888e2581d2a3b7a9d47e93 Binary files /dev/null and b/vendor/github.com/stretchr/testify/assert/forward_assertions.go differ diff --git a/vendor/github.com/stretchr/testify/assert/http_assertions.go b/vendor/github.com/stretchr/testify/assert/http_assertions.go new file mode 100644 index 0000000000000000000000000000000000000000..4ed341dd28934c102aa7a40c74ee24b6555c1db1 Binary files /dev/null and b/vendor/github.com/stretchr/testify/assert/http_assertions.go differ diff --git a/vendor/gopkg.in/yaml.v3/LICENSE b/vendor/gopkg.in/yaml.v3/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..2683e4bb1f24c14aa2791e6d48ce0ecf3d8ab756 Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/LICENSE differ diff --git a/vendor/gopkg.in/yaml.v3/NOTICE b/vendor/gopkg.in/yaml.v3/NOTICE new file mode 100644 index 0000000000000000000000000000000000000000..866d74a7ad79165312a2ce3904b4bdb53e6aedf7 Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/NOTICE differ diff --git a/vendor/gopkg.in/yaml.v3/README.md b/vendor/gopkg.in/yaml.v3/README.md new file mode 100644 index 0000000000000000000000000000000000000000..08eb1babddfac3d8f4e006448496d0e0d1f8d720 Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/README.md differ diff --git a/vendor/gopkg.in/yaml.v3/apic.go b/vendor/gopkg.in/yaml.v3/apic.go new file mode 100644 index 0000000000000000000000000000000000000000..ae7d049f182ae2419ded608e4c763487c99dff52 Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/apic.go differ diff --git a/vendor/gopkg.in/yaml.v3/decode.go b/vendor/gopkg.in/yaml.v3/decode.go new file mode 100644 index 0000000000000000000000000000000000000000..df36e3a30f55508515759037e072f79fc9e9e969 Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/decode.go differ diff --git a/vendor/gopkg.in/yaml.v3/emitterc.go b/vendor/gopkg.in/yaml.v3/emitterc.go new file mode 100644 index 0000000000000000000000000000000000000000..0f47c9ca8addf8e9d2e454e02842927ae825d0e9 Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/emitterc.go differ diff --git a/vendor/gopkg.in/yaml.v3/encode.go b/vendor/gopkg.in/yaml.v3/encode.go new file mode 100644 index 0000000000000000000000000000000000000000..de9e72a3e638d166e96ceab3d77ce59afe6e6f8a Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/encode.go differ diff --git a/vendor/gopkg.in/yaml.v3/parserc.go b/vendor/gopkg.in/yaml.v3/parserc.go new file mode 100644 index 0000000000000000000000000000000000000000..ac66fccc059e3837d17e2a3a1bec5b6d5c398ab1 Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/parserc.go differ diff --git a/vendor/gopkg.in/yaml.v3/readerc.go b/vendor/gopkg.in/yaml.v3/readerc.go new file mode 100644 index 0000000000000000000000000000000000000000..b7de0a89c462af605f889bc46ce165e5d4238add Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/readerc.go differ diff --git a/vendor/gopkg.in/yaml.v3/resolve.go b/vendor/gopkg.in/yaml.v3/resolve.go new file mode 100644 index 0000000000000000000000000000000000000000..64ae888057a5aa24c5a3a6ca0fcb08a06269e3ad Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/resolve.go differ diff --git a/vendor/gopkg.in/yaml.v3/scannerc.go b/vendor/gopkg.in/yaml.v3/scannerc.go new file mode 100644 index 0000000000000000000000000000000000000000..ca0070108f4ebe6a09a222075267e0ffca996e72 Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/scannerc.go differ diff --git a/vendor/gopkg.in/yaml.v3/sorter.go b/vendor/gopkg.in/yaml.v3/sorter.go new file mode 100644 index 0000000000000000000000000000000000000000..9210ece7e97232891625ed08c549b92c0e9bb169 Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/sorter.go differ diff --git a/vendor/gopkg.in/yaml.v3/writerc.go b/vendor/gopkg.in/yaml.v3/writerc.go new file mode 100644 index 0000000000000000000000000000000000000000..b8a116bf9a22b9911958f44904289a8c6b482bd2 Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/writerc.go differ diff --git a/vendor/gopkg.in/yaml.v3/yaml.go b/vendor/gopkg.in/yaml.v3/yaml.go new file mode 100644 index 0000000000000000000000000000000000000000..8cec6da48d3ec4d8858ca622383c75e359faee1f Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/yaml.go differ diff --git a/vendor/gopkg.in/yaml.v3/yamlh.go b/vendor/gopkg.in/yaml.v3/yamlh.go new file mode 100644 index 0000000000000000000000000000000000000000..7c6d0077061933c97979f6c84cb659b17391e1a3 Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/yamlh.go differ diff --git a/vendor/gopkg.in/yaml.v3/yamlprivateh.go b/vendor/gopkg.in/yaml.v3/yamlprivateh.go new file mode 100644 index 0000000000000000000000000000000000000000..e88f9c54aecb54ed42665b2a08b66a4f03d999bc Binary files /dev/null and b/vendor/gopkg.in/yaml.v3/yamlprivateh.go differ diff --git a/vendor/modules.txt b/vendor/modules.txt index e4d7aa31821205e1e8eb7e7920a4d8185d08933e..a4f5adfeefcfa922f4b4ed8ddaa1c00660469c5c 100644 Binary files a/vendor/modules.txt and b/vendor/modules.txt differ