diff --git a/cmd/task/main.go b/cmd/task/main.go index a349fd17c7b7cb703187fde31e9cb172592a528a..e31f03b83bafe6af209aa2a8d75d008fe0418428 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -90,7 +90,7 @@ func main() { healthSvc goahealth.Service ) { - taskSvc = task.New(storage, storage, logger) + taskSvc = task.New(storage, storage, cache, logger) healthSvc = health.New() } diff --git a/design/design.go b/design/design.go index 0e21586fbef9ea8846c3c78c87d649076b2398c8..f78f0680296dcbe79ae5a2edd5e413d12446c117 100644 --- a/design/design.go +++ b/design/design.go @@ -37,6 +37,16 @@ var _ = Service("task", func() { Response(StatusOK) }) }) + + Method("TaskResult", func() { + Description("TaskResult retrieves task result from the Cache service.") + Payload(TaskResultRequest) + Result(Any) + HTTP(func() { + GET("/v1/taskResult/{taskID}") + Response(StatusOK) + }) + }) }) var _ = Service("health", func() { diff --git a/design/types.go b/design/types.go index ed8662a42b4811224176ed58405dafc801b6f082..7625fe78437e5f08fb4a3a4c2c9cce2f699b316f 100644 --- a/design/types.go +++ b/design/types.go @@ -15,3 +15,8 @@ var CreateResult = Type("CreateResult", func() { Field(1, "taskID", String, "Unique task identifier.") Required("taskID") }) + +var TaskResultRequest = Type("TaskResultRequest", func() { + Field(1, "taskID", String, "Unique task identifier.") + Required("taskID") +}) diff --git a/gen/http/cli/task/cli.go b/gen/http/cli/task/cli.go index c859f0e4edee502712eaaba346baa5782937eddc..ca12630d5df90424c74955045a70afd451dcb583 100644 --- a/gen/http/cli/task/cli.go +++ b/gen/http/cli/task/cli.go @@ -25,14 +25,14 @@ import ( // func UsageCommands() string { return `health (liveness|readiness) -task create +task (create|task-result) ` } // UsageExamples produces an example of a valid invocation of the CLI tool. func UsageExamples() string { return os.Args[0] + ` health liveness` + "\n" + - os.Args[0] + ` task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae."` + "\n" + + os.Args[0] + ` task create --body "Eaque excepturi suscipit veritatis nemo." --task-name "Corrupti facere sequi tempora eius assumenda molestiae." --cache-namespace "Laborum sapiente." --cache-scope "Nam et."` + "\n" + "" } @@ -59,6 +59,9 @@ func ParseEndpoint( taskCreateTaskNameFlag = taskCreateFlags.String("task-name", "REQUIRED", "Task name.") taskCreateCacheNamespaceFlag = taskCreateFlags.String("cache-namespace", "", "") taskCreateCacheScopeFlag = taskCreateFlags.String("cache-scope", "", "") + + taskTaskResultFlags = flag.NewFlagSet("task-result", flag.ExitOnError) + taskTaskResultTaskIDFlag = taskTaskResultFlags.String("task-id", "REQUIRED", "Unique task identifier.") ) healthFlags.Usage = healthUsage healthLivenessFlags.Usage = healthLivenessUsage @@ -66,6 +69,7 @@ func ParseEndpoint( taskFlags.Usage = taskUsage taskCreateFlags.Usage = taskCreateUsage + taskTaskResultFlags.Usage = taskTaskResultUsage if err := flag.CommandLine.Parse(os.Args[1:]); err != nil { return nil, nil, err @@ -116,6 +120,9 @@ func ParseEndpoint( case "create": epf = taskCreateFlags + case "task-result": + epf = taskTaskResultFlags + } } @@ -154,6 +161,9 @@ func ParseEndpoint( case "create": endpoint = c.Create() data, err = taskc.BuildCreatePayload(*taskCreateBodyFlag, *taskCreateTaskNameFlag, *taskCreateCacheNamespaceFlag, *taskCreateCacheScopeFlag) + case "task-result": + endpoint = c.TaskResult() + data, err = taskc.BuildTaskResultPayload(*taskTaskResultTaskIDFlag) } } } @@ -206,6 +216,7 @@ Usage: COMMAND: create: Create a task and put it in a queue for execution. + task-result: TaskResult retrieves task result from the Cache service. Additional help: %[1]s task COMMAND --help @@ -221,6 +232,17 @@ Create a task and put it in a queue for execution. -cache-scope STRING: Example: - %[1]s task create --body "Nam et." --task-name "Minus reprehenderit non quo nihil adipisci." --cache-namespace "Pariatur sequi et." --cache-scope "Corrupti facere sequi tempora eius assumenda molestiae." + %[1]s task create --body "Eaque excepturi suscipit veritatis nemo." --task-name "Corrupti facere sequi tempora eius assumenda molestiae." --cache-namespace "Laborum sapiente." --cache-scope "Nam et." +`, os.Args[0]) +} + +func taskTaskResultUsage() { + fmt.Fprintf(os.Stderr, `%[1]s [flags] task task-result -task-id STRING + +TaskResult retrieves task result from the Cache service. + -task-id STRING: Unique task identifier. + +Example: + %[1]s task task-result --task-id "Et ipsa voluptate." `, os.Args[0]) } diff --git a/gen/http/openapi.json b/gen/http/openapi.json index cbf1687e983ec137eb1e5d86c0eefec31be36b3d..78e36c4475d9a753284914cfcbc51172644ffe72 100644 --- a/gen/http/openapi.json +++ b/gen/http/openapi.json @@ -1 +1 @@ -{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","required":false,"type":"string"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","required":false,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Eaque excepturi suscipit veritatis nemo."}},"example":{"taskID":"Corrupti quia autem dolorum sunt aperiam quaerat."},"required":["taskID"]}}} \ No newline at end of file +{"swagger":"2.0","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":""},"host":"localhost:8082","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}},"schemes":["http"]}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"type":"string"},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","required":false,"type":"string"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","required":false,"type":"string"},{"name":"any","in":"body","description":"Data contains JSON payload that will be used for task execution.","required":true,"schema":{"type":"string","format":"binary"}}],"responses":{"200":{"description":"OK response.","schema":{"$ref":"#/definitions/TaskCreateResponseBody","required":["taskID"]}}},"schemes":["http"]}},"/v1/taskResult/{taskID}":{"get":{"tags":["task"],"summary":"TaskResult task","description":"TaskResult retrieves task result from the Cache service.","operationId":"task#TaskResult","parameters":[{"name":"taskID","in":"path","description":"Unique task identifier.","required":true,"type":"string"}],"responses":{"200":{"description":"OK response.","schema":{"type":"string","format":"binary"}}},"schemes":["http"]}}},"definitions":{"TaskCreateResponseBody":{"title":"TaskCreateResponseBody","type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Vel odio et doloribus est quod laborum."}},"example":{"taskID":"Harum aut autem aliquam dolorem non soluta."},"required":["taskID"]}}} \ No newline at end of file diff --git a/gen/http/openapi.yaml b/gen/http/openapi.yaml index ca530f83c341213f3f961f435c8ecf88d4d75f6e..1ccd3b61d33f7ff620ecc4d959899033d4d70ded 100644 --- a/gen/http/openapi.yaml +++ b/gen/http/openapi.yaml @@ -74,6 +74,27 @@ paths: - taskID schemes: - http + /v1/taskResult/{taskID}: + get: + tags: + - task + summary: TaskResult task + description: TaskResult retrieves task result from the Cache service. + operationId: task#TaskResult + parameters: + - name: taskID + in: path + description: Unique task identifier. + required: true + type: string + responses: + "200": + description: OK response. + schema: + type: string + format: binary + schemes: + - http definitions: TaskCreateResponseBody: title: TaskCreateResponseBody @@ -82,8 +103,8 @@ definitions: taskID: type: string description: Unique task identifier. - example: Eaque excepturi suscipit veritatis nemo. + example: Vel odio et doloribus est quod laborum. example: - taskID: Corrupti quia autem dolorum sunt aperiam quaerat. + taskID: Harum aut autem aliquam dolorem non soluta. required: - taskID diff --git a/gen/http/openapi3.json b/gen/http/openapi3.json index deb72810cb4565839f30f5ddb709475fc8d2bebf..926387963b08082fda67874bb159f6e86d4c8295 100644 --- a/gen/http/openapi3.json +++ b/gen/http/openapi3.json @@ -1 +1 @@ -{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Vel odio et doloribus est quod laborum."},"example":"Harum aut autem aliquam dolorem non soluta."},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key namespace","example":"login"},"example":"login"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key scope","example":"user"},"example":"user"}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Eveniet et eligendi sint quibusdam quia maxime.","format":"binary"},"example":"Ipsam et est accusantium."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Facere quibusdam voluptate beatae."}}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Et ipsa voluptate."}},"example":{"taskID":"Quo qui fuga impedit eos fuga et."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} \ No newline at end of file +{"openapi":"3.0.3","info":{"title":"Task Service","description":"The task service is executing tasks created from policies.","version":"1.0"},"servers":[{"url":"http://localhost:8082","description":"Task Server"}],"paths":{"/liveness":{"get":{"tags":["health"],"summary":"Liveness health","operationId":"health#Liveness","responses":{"200":{"description":"OK response."}}}},"/readiness":{"get":{"tags":["health"],"summary":"Readiness health","operationId":"health#Readiness","responses":{"200":{"description":"OK response."}}}},"/v1/task/{taskName}":{"post":{"tags":["task"],"summary":"Create task","description":"Create a task and put it in a queue for execution.","operationId":"task#Create","parameters":[{"name":"taskName","in":"path","description":"Task name.","required":true,"schema":{"type":"string","description":"Task name.","example":"Aut qui aut itaque et commodi vel."},"example":"Sapiente et."},{"name":"x-cache-namespace","in":"header","description":"Cache key namespace","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key namespace","example":"login"},"example":"login"},{"name":"x-cache-scope","in":"header","description":"Cache key scope","allowEmptyValue":true,"schema":{"type":"string","description":"Cache key scope","example":"user"},"example":"user"}],"requestBody":{"description":"Data contains JSON payload that will be used for task execution.","required":true,"content":{"application/json":{"schema":{"type":"string","description":"Data contains JSON payload that will be used for task execution.","example":"Excepturi aut consequatur animi rerum.","format":"binary"},"example":"Dolorem illo officiis ipsa impedit harum et."}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CreateResult"},"example":{"taskID":"Corrupti quia autem dolorum sunt aperiam quaerat."}}}}}}},"/v1/taskResult/{taskID}":{"get":{"tags":["task"],"summary":"TaskResult task","description":"TaskResult retrieves task result from the Cache service.","operationId":"task#TaskResult","parameters":[{"name":"taskID","in":"path","description":"Unique task identifier.","required":true,"schema":{"type":"string","description":"Unique task identifier.","example":"Delectus natus eos."},"example":"Quae ut dolores ab."}],"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"type":"string","example":"Illum quidem sapiente sed velit.","format":"binary"},"example":"Omnis commodi reiciendis eum non."}}}}}}},"components":{"schemas":{"CreateResult":{"type":"object","properties":{"taskID":{"type":"string","description":"Unique task identifier.","example":"Dolores atque error ab."}},"example":{"taskID":"Laboriosam perspiciatis vitae numquam."},"required":["taskID"]}}},"tags":[{"name":"health","description":"Health service provides health check endpoints."},{"name":"task","description":"Task service provides endpoints to work with tasks."}]} \ No newline at end of file diff --git a/gen/http/openapi3.yaml b/gen/http/openapi3.yaml index 8e14b7c0de5748cbd7901c7eea9745a8feb357ca..7822203e80727be6ad349a505c7520f5f53bf1be 100644 --- a/gen/http/openapi3.yaml +++ b/gen/http/openapi3.yaml @@ -40,8 +40,8 @@ paths: schema: type: string description: Task name. - example: Vel odio et doloribus est quod laborum. - example: Harum aut autem aliquam dolorem non soluta. + example: Aut qui aut itaque et commodi vel. + example: Sapiente et. - name: x-cache-namespace in: header description: Cache key namespace @@ -68,9 +68,9 @@ paths: schema: type: string description: Data contains JSON payload that will be used for task execution. - example: Eveniet et eligendi sint quibusdam quia maxime. + example: Excepturi aut consequatur animi rerum. format: binary - example: Ipsam et est accusantium. + example: Dolorem illo officiis ipsa impedit harum et. responses: "200": description: OK response. @@ -79,7 +79,34 @@ paths: schema: $ref: '#/components/schemas/CreateResult' example: - taskID: Facere quibusdam voluptate beatae. + taskID: Corrupti quia autem dolorum sunt aperiam quaerat. + /v1/taskResult/{taskID}: + get: + tags: + - task + summary: TaskResult task + description: TaskResult retrieves task result from the Cache service. + operationId: task#TaskResult + parameters: + - name: taskID + in: path + description: Unique task identifier. + required: true + schema: + type: string + description: Unique task identifier. + example: Delectus natus eos. + example: Quae ut dolores ab. + responses: + "200": + description: OK response. + content: + application/json: + schema: + type: string + example: Illum quidem sapiente sed velit. + format: binary + example: Omnis commodi reiciendis eum non. components: schemas: CreateResult: @@ -88,9 +115,9 @@ components: taskID: type: string description: Unique task identifier. - example: Et ipsa voluptate. + example: Dolores atque error ab. example: - taskID: Quo qui fuga impedit eos fuga et. + taskID: Laboriosam perspiciatis vitae numquam. required: - taskID tags: diff --git a/gen/http/task/client/cli.go b/gen/http/task/client/cli.go index b6ff7fb25a49ddd34dfa22013d4c4d398536335f..f4e478f9179ff782d6cf169ef51804e1f7d6eed9 100644 --- a/gen/http/task/client/cli.go +++ b/gen/http/task/client/cli.go @@ -22,7 +22,7 @@ func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string, taskCr { err = json.Unmarshal([]byte(taskCreateBody), &body) if err != nil { - return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Nam et.\"") + return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "\"Eaque excepturi suscipit veritatis nemo.\"") } } var taskName string @@ -51,3 +51,16 @@ func BuildCreatePayload(taskCreateBody string, taskCreateTaskName string, taskCr return res, nil } + +// BuildTaskResultPayload builds the payload for the task TaskResult endpoint +// from CLI flags. +func BuildTaskResultPayload(taskTaskResultTaskID string) (*task.TaskResultRequest, error) { + var taskID string + { + taskID = taskTaskResultTaskID + } + v := &task.TaskResultRequest{} + v.TaskID = taskID + + return v, nil +} diff --git a/gen/http/task/client/client.go b/gen/http/task/client/client.go index 2e27b44317476b4161315e912950aa8520a444ca..6a7fd72c35116d724783150081a061e53aeb0806 100644 --- a/gen/http/task/client/client.go +++ b/gen/http/task/client/client.go @@ -20,6 +20,10 @@ type Client struct { // Create Doer is the HTTP client used to make requests to the Create endpoint. CreateDoer goahttp.Doer + // TaskResult Doer is the HTTP client used to make requests to the TaskResult + // endpoint. + TaskResultDoer goahttp.Doer + // RestoreResponseBody controls whether the response bodies are reset after // decoding so they can be read again. RestoreResponseBody bool @@ -41,6 +45,7 @@ func NewClient( ) *Client { return &Client{ CreateDoer: doer, + TaskResultDoer: doer, RestoreResponseBody: restoreBody, scheme: scheme, host: host, @@ -72,3 +77,22 @@ func (c *Client) Create() goa.Endpoint { return decodeResponse(resp) } } + +// TaskResult returns an endpoint that makes HTTP requests to the task service +// TaskResult server. +func (c *Client) TaskResult() goa.Endpoint { + var ( + decodeResponse = DecodeTaskResultResponse(c.decoder, c.RestoreResponseBody) + ) + return func(ctx context.Context, v interface{}) (interface{}, error) { + req, err := c.BuildTaskResultRequest(ctx, v) + if err != nil { + return nil, err + } + resp, err := c.TaskResultDoer.Do(req) + if err != nil { + return nil, goahttp.ErrRequestError("task", "TaskResult", err) + } + return decodeResponse(resp) + } +} diff --git a/gen/http/task/client/encode_decode.go b/gen/http/task/client/encode_decode.go index 52ac5bd62e0c8889272354c4336f550af8ea593f..a7ecbf5caea14083c7bcfdac4483920e08fa1835 100644 --- a/gen/http/task/client/encode_decode.go +++ b/gen/http/task/client/encode_decode.go @@ -106,3 +106,63 @@ func DecodeCreateResponse(decoder func(*http.Response) goahttp.Decoder, restoreB } } } + +// BuildTaskResultRequest instantiates a HTTP request object with method and +// path set to call the "task" service "TaskResult" endpoint +func (c *Client) BuildTaskResultRequest(ctx context.Context, v interface{}) (*http.Request, error) { + var ( + taskID string + ) + { + p, ok := v.(*task.TaskResultRequest) + if !ok { + return nil, goahttp.ErrInvalidType("task", "TaskResult", "*task.TaskResultRequest", v) + } + taskID = p.TaskID + } + u := &url.URL{Scheme: c.scheme, Host: c.host, Path: TaskResultTaskPath(taskID)} + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, goahttp.ErrInvalidURL("task", "TaskResult", u.String(), err) + } + if ctx != nil { + req = req.WithContext(ctx) + } + + return req, nil +} + +// DecodeTaskResultResponse returns a decoder for responses returned by the +// task TaskResult endpoint. restoreBody controls whether the response body +// should be restored after having been read. +func DecodeTaskResultResponse(decoder func(*http.Response) goahttp.Decoder, restoreBody bool) func(*http.Response) (interface{}, error) { + return func(resp *http.Response) (interface{}, error) { + if restoreBody { + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + resp.Body = ioutil.NopCloser(bytes.NewBuffer(b)) + defer func() { + resp.Body = ioutil.NopCloser(bytes.NewBuffer(b)) + }() + } else { + defer resp.Body.Close() + } + switch resp.StatusCode { + case http.StatusOK: + var ( + body interface{} + err error + ) + err = decoder(resp).Decode(&body) + if err != nil { + return nil, goahttp.ErrDecodingError("task", "TaskResult", err) + } + return body, nil + default: + body, _ := ioutil.ReadAll(resp.Body) + return nil, goahttp.ErrInvalidResponse("task", "TaskResult", resp.StatusCode, string(body)) + } + } +} diff --git a/gen/http/task/client/paths.go b/gen/http/task/client/paths.go index f3f5bc05dd782f6183c29b1eeed9f3fc94b26f6a..1b210e99f5d88b733fea49e43fcfc6549c8d58c0 100644 --- a/gen/http/task/client/paths.go +++ b/gen/http/task/client/paths.go @@ -15,3 +15,8 @@ import ( func CreateTaskPath(taskName string) string { return fmt.Sprintf("/v1/task/%v", taskName) } + +// TaskResultTaskPath returns the URL path to the task service TaskResult HTTP endpoint. +func TaskResultTaskPath(taskID string) string { + return fmt.Sprintf("/v1/taskResult/%v", taskID) +} diff --git a/gen/http/task/server/encode_decode.go b/gen/http/task/server/encode_decode.go index f4549af31a070fd6f67faa7273753f1475c0929b..716de0ada1b13cec9bb4401ef6e06a8ec19b3a26 100644 --- a/gen/http/task/server/encode_decode.go +++ b/gen/http/task/server/encode_decode.go @@ -66,3 +66,31 @@ func DecodeCreateRequest(mux goahttp.Muxer, decoder func(*http.Request) goahttp. return payload, nil } } + +// EncodeTaskResultResponse returns an encoder for responses returned by the +// task TaskResult endpoint. +func EncodeTaskResultResponse(encoder func(context.Context, http.ResponseWriter) goahttp.Encoder) func(context.Context, http.ResponseWriter, interface{}) error { + return func(ctx context.Context, w http.ResponseWriter, v interface{}) error { + res, _ := v.(interface{}) + enc := encoder(ctx, w) + body := res + w.WriteHeader(http.StatusOK) + return enc.Encode(body) + } +} + +// DecodeTaskResultRequest returns a decoder for requests sent to the task +// TaskResult endpoint. +func DecodeTaskResultRequest(mux goahttp.Muxer, decoder func(*http.Request) goahttp.Decoder) func(*http.Request) (interface{}, error) { + return func(r *http.Request) (interface{}, error) { + var ( + taskID string + + params = mux.Vars(r) + ) + taskID = params["taskID"] + payload := NewTaskResultRequest(taskID) + + return payload, nil + } +} diff --git a/gen/http/task/server/paths.go b/gen/http/task/server/paths.go index 6d72cc965b9783a051a2df93e48ee1f394d21188..cd34675679b694ad6b137837079466ddad4a3228 100644 --- a/gen/http/task/server/paths.go +++ b/gen/http/task/server/paths.go @@ -15,3 +15,8 @@ import ( func CreateTaskPath(taskName string) string { return fmt.Sprintf("/v1/task/%v", taskName) } + +// TaskResultTaskPath returns the URL path to the task service TaskResult HTTP endpoint. +func TaskResultTaskPath(taskID string) string { + return fmt.Sprintf("/v1/taskResult/%v", taskID) +} diff --git a/gen/http/task/server/server.go b/gen/http/task/server/server.go index 8dcbd4c96344e3dce09c4b848f33c8a7ccb8c263..589e54c33f58b7988bb7fbf8d374f5b881ee0bf5 100644 --- a/gen/http/task/server/server.go +++ b/gen/http/task/server/server.go @@ -18,8 +18,9 @@ import ( // Server lists the task service endpoint HTTP handlers. type Server struct { - Mounts []*MountPoint - Create http.Handler + Mounts []*MountPoint + Create http.Handler + TaskResult http.Handler } // ErrorNamer is an interface implemented by generated error structs that @@ -56,8 +57,10 @@ func New( return &Server{ Mounts: []*MountPoint{ {"Create", "POST", "/v1/task/{taskName}"}, + {"TaskResult", "GET", "/v1/taskResult/{taskID}"}, }, - Create: NewCreateHandler(e.Create, mux, decoder, encoder, errhandler, formatter), + Create: NewCreateHandler(e.Create, mux, decoder, encoder, errhandler, formatter), + TaskResult: NewTaskResultHandler(e.TaskResult, mux, decoder, encoder, errhandler, formatter), } } @@ -67,11 +70,13 @@ func (s *Server) Service() string { return "task" } // Use wraps the server handlers with the given middleware. func (s *Server) Use(m func(http.Handler) http.Handler) { s.Create = m(s.Create) + s.TaskResult = m(s.TaskResult) } // Mount configures the mux to serve the task endpoints. func Mount(mux goahttp.Muxer, h *Server) { MountCreateHandler(mux, h.Create) + MountTaskResultHandler(mux, h.TaskResult) } // Mount configures the mux to serve the task endpoints. @@ -129,3 +134,54 @@ func NewCreateHandler( } }) } + +// MountTaskResultHandler configures the mux to serve the "task" service +// "TaskResult" endpoint. +func MountTaskResultHandler(mux goahttp.Muxer, h http.Handler) { + f, ok := h.(http.HandlerFunc) + if !ok { + f = func(w http.ResponseWriter, r *http.Request) { + h.ServeHTTP(w, r) + } + } + mux.Handle("GET", "/v1/taskResult/{taskID}", f) +} + +// NewTaskResultHandler creates a HTTP handler which loads the HTTP request and +// calls the "task" service "TaskResult" endpoint. +func NewTaskResultHandler( + endpoint goa.Endpoint, + mux goahttp.Muxer, + decoder func(*http.Request) goahttp.Decoder, + encoder func(context.Context, http.ResponseWriter) goahttp.Encoder, + errhandler func(context.Context, http.ResponseWriter, error), + formatter func(err error) goahttp.Statuser, +) http.Handler { + var ( + decodeRequest = DecodeTaskResultRequest(mux, decoder) + encodeResponse = EncodeTaskResultResponse(encoder) + encodeError = goahttp.ErrorEncoder(encoder, formatter) + ) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := context.WithValue(r.Context(), goahttp.AcceptTypeKey, r.Header.Get("Accept")) + ctx = context.WithValue(ctx, goa.MethodKey, "TaskResult") + ctx = context.WithValue(ctx, goa.ServiceKey, "task") + payload, err := decodeRequest(r) + if err != nil { + if err := encodeError(ctx, w, err); err != nil { + errhandler(ctx, w, err) + } + return + } + res, err := endpoint(ctx, payload) + if err != nil { + if err := encodeError(ctx, w, err); err != nil { + errhandler(ctx, w, err) + } + return + } + if err := encodeResponse(ctx, w, res); err != nil { + errhandler(ctx, w, err) + } + }) +} diff --git a/gen/http/task/server/types.go b/gen/http/task/server/types.go index de98cf1a081580e66992b3dd65a5c5e6295d7a99..c1482c9a8bc09f8781fd87e399293afa5c4392c6 100644 --- a/gen/http/task/server/types.go +++ b/gen/http/task/server/types.go @@ -39,3 +39,11 @@ func NewCreateRequest(body interface{}, taskName string, cacheNamespace *string, return res } + +// NewTaskResultRequest builds a task service TaskResult endpoint payload. +func NewTaskResultRequest(taskID string) *task.TaskResultRequest { + v := &task.TaskResultRequest{} + v.TaskID = taskID + + return v +} diff --git a/gen/task/client.go b/gen/task/client.go index fb003d3f4bb7fbb112d0844cbf9ec1a3a80e87cd..3f812485b1c4b9ff706e6278e0bb385f7f4da636 100644 --- a/gen/task/client.go +++ b/gen/task/client.go @@ -15,13 +15,15 @@ import ( // Client is the "task" service client. type Client struct { - CreateEndpoint goa.Endpoint + CreateEndpoint goa.Endpoint + TaskResultEndpoint goa.Endpoint } // NewClient initializes a "task" service client given the endpoints. -func NewClient(create goa.Endpoint) *Client { +func NewClient(create, taskResult goa.Endpoint) *Client { return &Client{ - CreateEndpoint: create, + CreateEndpoint: create, + TaskResultEndpoint: taskResult, } } @@ -34,3 +36,13 @@ func (c *Client) Create(ctx context.Context, p *CreateRequest) (res *CreateResul } return ires.(*CreateResult), nil } + +// TaskResult calls the "TaskResult" endpoint of the "task" service. +func (c *Client) TaskResult(ctx context.Context, p *TaskResultRequest) (res interface{}, err error) { + var ires interface{} + ires, err = c.TaskResultEndpoint(ctx, p) + if err != nil { + return + } + return ires.(interface{}), nil +} diff --git a/gen/task/endpoints.go b/gen/task/endpoints.go index c5d61e36c2451113c04536a2b125e790c55c7c83..b648dd83cb34f626e54dbfba734439cbfbe813ca 100644 --- a/gen/task/endpoints.go +++ b/gen/task/endpoints.go @@ -15,19 +15,22 @@ import ( // Endpoints wraps the "task" service endpoints. type Endpoints struct { - Create goa.Endpoint + Create goa.Endpoint + TaskResult goa.Endpoint } // NewEndpoints wraps the methods of the "task" service with endpoints. func NewEndpoints(s Service) *Endpoints { return &Endpoints{ - Create: NewCreateEndpoint(s), + Create: NewCreateEndpoint(s), + TaskResult: NewTaskResultEndpoint(s), } } // Use applies the given middleware to all the "task" service endpoints. func (e *Endpoints) Use(m func(goa.Endpoint) goa.Endpoint) { e.Create = m(e.Create) + e.TaskResult = m(e.TaskResult) } // NewCreateEndpoint returns an endpoint function that calls the method @@ -38,3 +41,12 @@ func NewCreateEndpoint(s Service) goa.Endpoint { return s.Create(ctx, p) } } + +// NewTaskResultEndpoint returns an endpoint function that calls the method +// "TaskResult" of service "task". +func NewTaskResultEndpoint(s Service) goa.Endpoint { + return func(ctx context.Context, req interface{}) (interface{}, error) { + p := req.(*TaskResultRequest) + return s.TaskResult(ctx, p) + } +} diff --git a/gen/task/service.go b/gen/task/service.go index 13a5d28ac0ffbde98d80d9c898cdb789b16c2faa..f5c4b6954680960745e92c7556d1f243413b7647 100644 --- a/gen/task/service.go +++ b/gen/task/service.go @@ -15,6 +15,8 @@ import ( type Service interface { // Create a task and put it in a queue for execution. Create(context.Context, *CreateRequest) (res *CreateResult, err error) + // TaskResult retrieves task result from the Cache service. + TaskResult(context.Context, *TaskResultRequest) (res interface{}, err error) } // ServiceName is the name of the service as defined in the design. This is the @@ -25,7 +27,7 @@ const ServiceName = "task" // MethodNames lists the service method names as defined in the design. These // are the same values that are set in the endpoint request contexts under the // MethodKey key. -var MethodNames = [1]string{"Create"} +var MethodNames = [2]string{"Create", "TaskResult"} // CreateRequest is the payload type of the task service Create method. type CreateRequest struct { @@ -44,3 +46,9 @@ type CreateResult struct { // Unique task identifier. TaskID string } + +// TaskResultRequest is the payload type of the task service TaskResult method. +type TaskResultRequest struct { + // Unique task identifier. + TaskID string +} diff --git a/internal/clients/cache/client.go b/internal/clients/cache/client.go index dc9033bd88e9d35fa5a4663d213dac66a81a491e..7ad51dceaab26442febc5d8afffb29eba1812727 100644 --- a/internal/clients/cache/client.go +++ b/internal/clients/cache/client.go @@ -4,7 +4,10 @@ import ( "bytes" "context" "fmt" + "io" "net/http" + + "code.vereign.com/gaiax/tsa/golib/errors" ) // Client for the Cache service. @@ -27,7 +30,7 @@ func New(addr string, opts ...Option) *Client { } func (c *Client) Set(ctx context.Context, key, namespace, scope string, value []byte) error { - req, err := http.NewRequest("POST", c.addr+"/v1/cache", bytes.NewReader(value)) + req, err := http.NewRequestWithContext(ctx, "POST", c.addr+"/v1/cache", bytes.NewReader(value)) if err != nil { return err } @@ -38,19 +41,44 @@ func (c *Client) Set(ctx context.Context, key, namespace, scope string, value [] "x-cache-scope": []string{scope}, } - resp, err := c.httpClient.Do(req.WithContext(ctx)) + resp, err := c.httpClient.Do(req) if err != nil { return err } defer resp.Body.Close() // nolint:errcheck if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected response code: %d", resp.StatusCode) + msg := fmt.Sprintf("unexpected response: %d %s", resp.StatusCode, resp.Status) + return errors.New(errors.GetKind(resp.StatusCode), msg) } return nil } func (c *Client) Get(ctx context.Context, key, namespace, scope string) ([]byte, error) { - return nil, fmt.Errorf("not implemented") + req, err := http.NewRequestWithContext(ctx, "GET", c.addr+"/v1/cache", nil) + req.Header = http.Header{ + "x-cache-key": []string{key}, + "x-cache-namespace": []string{namespace}, + "x-cache-scope": []string{scope}, + } + if err != nil { + return nil, err + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() // nolint:errcheck + + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { + return nil, errors.New(errors.NotFound) + } + msg := fmt.Sprintf("unexpected response: %d %s", resp.StatusCode, resp.Status) + return nil, errors.New(errors.GetKind(resp.StatusCode), msg) + } + + return io.ReadAll(resp.Body) } diff --git a/internal/executor/worker.go b/internal/executor/worker.go index feb9d3f81eddbd82698bd4d14ca373a6572cc1e5..aea032d32ef535db6a96d4b28a94335cd9379f1c 100644 --- a/internal/executor/worker.go +++ b/internal/executor/worker.go @@ -107,6 +107,7 @@ func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task if err != nil { return nil, errors.New("error evaluating request policy", err) } + task.ResponseCode = http.StatusOK } else if task.URL != "" && task.Method != "" { var status int status, response, err = w.doHTTPTask(ctx, task) diff --git a/internal/service/task/service.go b/internal/service/task/service.go index 78b713dc31d6accfbeda69bd74f5cf406e2c82ab..098c7bb27c8739053c5965ab6884b80db7029e09 100644 --- a/internal/service/task/service.go +++ b/internal/service/task/service.go @@ -1,6 +1,7 @@ package task import ( + "bytes" "context" "encoding/json" "time" @@ -15,6 +16,8 @@ import ( // Storage for retrieving predefined task templates. type Storage interface { TaskTemplate(ctx context.Context, taskName string) (*Task, error) + Task(ctx context.Context, taskID string) (*Task, error) + TaskHistory(ctx context.Context, taskID string) (*Task, error) } // Queue interface for retrieving, returning and removing tasks from Queue. @@ -25,17 +28,23 @@ type Queue interface { Unack(ctx context.Context, task *Task) error } +type Cache interface { + Get(ctx context.Context, key, namespace, scope string) ([]byte, error) +} + type Service struct { storage Storage queue Queue + cache Cache logger *zap.Logger } // New creates the task service. -func New(template Storage, queue Queue, logger *zap.Logger) *Service { +func New(template Storage, queue Queue, cache Cache, logger *zap.Logger) *Service { return &Service{ storage: template, queue: queue, + cache: cache, logger: logger, } } @@ -77,3 +86,47 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res * return &goatask.CreateResult{TaskID: task.ID}, nil } + +// TaskResult retrieves task result from the Cache service. +func (s *Service) TaskResult(ctx context.Context, req *goatask.TaskResultRequest) (res interface{}, err error) { + if req.TaskID == "" { + return nil, errors.New(errors.BadRequest, "missing taskID") + } + + logger := s.logger.With(zap.String("taskID", req.TaskID)) + + var task *Task + task, err = s.storage.TaskHistory(ctx, req.TaskID) + if err != nil && !errors.Is(errors.NotFound, err) { + logger.Error("error getting task from history collection", zap.Error(err)) + return nil, err + } + + if task == nil { + task, err = s.storage.Task(ctx, req.TaskID) + if err != nil { + if !errors.Is(errors.NotFound, err) { + logger.Error("error getting task from history collection", zap.Error(err)) + } + return nil, err + } + } + + if task.State != Done { + return nil, errors.New(errors.NotFound, "no result, task is not completed") + } + + value, err := s.cache.Get(ctx, task.ID, task.CacheNamespace, task.CacheScope) + if err != nil { + logger.Error("error getting task result from cache", zap.Error(err)) + return nil, err + } + + var result interface{} + if err := json.NewDecoder(bytes.NewReader(value)).Decode(&result); err != nil { + logger.Error("error decoding result from cache", zap.Error(err)) + return nil, errors.New("error decoding result from cache", err) + } + + return result, nil +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 0e314d761b3cf3ce6f95a2ae959b180a6f7a6c54..f5b44125ce771b4bec90965e4637bd8e115906b9 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -118,3 +118,43 @@ func (s *Storage) SaveTaskHistory(ctx context.Context, task *task.Task) error { b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) return backoff.Retry(insert, b) } + +func (s *Storage) Task(ctx context.Context, taskID string) (*task.Task, error) { + result := s.tasks.FindOne(ctx, bson.M{ + "id": taskID, + }) + + if result.Err() != nil { + if strings.Contains(result.Err().Error(), "no documents in result") { + return nil, errors.New(errors.NotFound, "task not found") + } + return nil, result.Err() + } + + var task task.Task + if err := result.Decode(&task); err != nil { + return nil, err + } + + return &task, nil +} + +func (s *Storage) TaskHistory(ctx context.Context, taskID string) (*task.Task, error) { + result := s.tasksHistory.FindOne(ctx, bson.M{ + "id": taskID, + }) + + if result.Err() != nil { + if strings.Contains(result.Err().Error(), "no documents in result") { + return nil, errors.New(errors.NotFound, "task not found") + } + return nil, result.Err() + } + + var task task.Task + if err := result.Decode(&task); err != nil { + return nil, err + } + + return &task, nil +}