diff --git a/cmd/task/main.go b/cmd/task/main.go index 324743a9f3b9a19795f86582e14f9cae79c8e24d..d721e0ce5e28067f84f04a7ff67aaff4d8477b8d 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -24,6 +24,7 @@ import ( goatasksrv "code.vereign.com/gaiax/tsa/task/gen/http/task/server" "code.vereign.com/gaiax/tsa/task/gen/openapi" goatask "code.vereign.com/gaiax/tsa/task/gen/task" + "code.vereign.com/gaiax/tsa/task/internal/clients/policy" "code.vereign.com/gaiax/tsa/task/internal/config" "code.vereign.com/gaiax/tsa/task/internal/executor" "code.vereign.com/gaiax/tsa/task/internal/service" @@ -64,10 +65,13 @@ func main() { // create storage storage := storage.New(db) + // create policy client + policy := policy.New(cfg.Policy.Addr, httpClient()) + // create task executor executor := executor.New( storage, - nil, + policy, cfg.Executor.Workers, cfg.Executor.PollInterval, httpClient(), diff --git a/internal/clients/policy/client.go b/internal/clients/policy/client.go index c727695766d6b554ab9818a025b716aaa30ee4ce..7803afc328b847c56906b4265c851aba784325a6 100644 --- a/internal/clients/policy/client.go +++ b/internal/clients/policy/client.go @@ -1,20 +1,50 @@ package policy import ( + "bytes" "context" "fmt" + "io" + "net/http" + "net/url" + + "code.vereign.com/gaiax/tsa/golib/errors" ) type Client struct { - addr string + addr string + httpClient *http.Client } -func New(addr string) *Client { +func New(addr string, httpClient *http.Client) *Client { return &Client{ - addr: addr, + addr: addr, + httpClient: httpClient, } } +// Evaluate calls the policy service to execute the given policy. func (c *Client) Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) { - return nil, fmt.Errorf("not implemented") + uri := c.addr + "/policy/" + policy + "/evaluation" + policyURL, err := url.ParseRequestURI(uri) + if err != nil { + return nil, errors.New(errors.BadRequest, "invalid policy evaluation URL", err) + } + + req, err := http.NewRequest("POST", policyURL.String(), bytes.NewReader(data)) + if err != nil { + return nil, err + } + + resp, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected response on policy evaluation: %d", resp.StatusCode) + } + + return io.ReadAll(resp.Body) } diff --git a/internal/config/config.go b/internal/config/config.go index 6c07536218553442564a53ae86aae834b4e3a588..a7a167e08afd10522e08222cb5d205563fe99be6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,6 +5,7 @@ import "time" type Config struct { HTTP httpConfig Mongo mongoConfig + Policy policyConfig Executor executorConfig LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` @@ -24,6 +25,10 @@ type mongoConfig struct { Pass string `envconfig:"MONGO_PASS" required:"true"` } +type policyConfig struct { + Addr string `envconfig:"POLICY_ADDR" required:"true"` +} + type executorConfig struct { Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"` PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 896a59f5e3fc420e72437c0099a22d9a5950ba58..f49ae4d0316ac5702d7916ed659aae2917d35c26 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -12,10 +12,12 @@ import ( "code.vereign.com/gaiax/tsa/task/internal/service/task" ) +// Policy client. type Policy interface { Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) } +// Queue allows retrieving, returning and deleting tasks from storage. type Queue interface { Poll(ctx context.Context) (*task.Task, error) Ack(ctx context.Context, task *task.Task) error diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 049de8d4237ccaccb4397211b9fc354556ab99c9..d92e4226f3e62969c411a5913a2e353072ff4568 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -92,7 +92,7 @@ func (s *Storage) Poll(ctx context.Context) (*task.Task, error) { return &task, nil } -// Ack removes a task from the tasks collection. +// Ack removes a task from the `tasks` collection. func (s *Storage) Ack(ctx context.Context, task *task.Task) error { _, err := s.tasks.DeleteOne(ctx, bson.M{"id": task.ID}) return err