Skip to content
Snippets Groups Projects
Commit cc03638c authored by Lyuben Penkovski's avatar Lyuben Penkovski
Browse files

Policy evaluation client

parent 5d03d0dd
No related branches found
No related tags found
No related merge requests found
Pipeline #50494 passed
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
goatasksrv "code.vereign.com/gaiax/tsa/task/gen/http/task/server" goatasksrv "code.vereign.com/gaiax/tsa/task/gen/http/task/server"
"code.vereign.com/gaiax/tsa/task/gen/openapi" "code.vereign.com/gaiax/tsa/task/gen/openapi"
goatask "code.vereign.com/gaiax/tsa/task/gen/task" goatask "code.vereign.com/gaiax/tsa/task/gen/task"
"code.vereign.com/gaiax/tsa/task/internal/clients/policy"
"code.vereign.com/gaiax/tsa/task/internal/config" "code.vereign.com/gaiax/tsa/task/internal/config"
"code.vereign.com/gaiax/tsa/task/internal/executor" "code.vereign.com/gaiax/tsa/task/internal/executor"
"code.vereign.com/gaiax/tsa/task/internal/service" "code.vereign.com/gaiax/tsa/task/internal/service"
...@@ -64,10 +65,13 @@ func main() { ...@@ -64,10 +65,13 @@ func main() {
// create storage // create storage
storage := storage.New(db) storage := storage.New(db)
// create policy client
policy := policy.New(cfg.Policy.Addr, httpClient())
// create task executor // create task executor
executor := executor.New( executor := executor.New(
storage, storage,
nil, policy,
cfg.Executor.Workers, cfg.Executor.Workers,
cfg.Executor.PollInterval, cfg.Executor.PollInterval,
httpClient(), httpClient(),
......
package policy package policy
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io"
"net/http"
"net/url"
"code.vereign.com/gaiax/tsa/golib/errors"
) )
type Client struct { type Client struct {
addr string addr string
httpClient *http.Client
} }
func New(addr string) *Client { func New(addr string, httpClient *http.Client) *Client {
return &Client{ return &Client{
addr: addr, addr: addr,
httpClient: httpClient,
} }
} }
// Evaluate calls the policy service to execute the given policy.
func (c *Client) Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) { func (c *Client) Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) {
return nil, fmt.Errorf("not implemented") uri := c.addr + "/policy/" + policy + "/evaluation"
policyURL, err := url.ParseRequestURI(uri)
if err != nil {
return nil, errors.New(errors.BadRequest, "invalid policy evaluation URL", err)
}
req, err := http.NewRequest("POST", policyURL.String(), bytes.NewReader(data))
if err != nil {
return nil, err
}
resp, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected response on policy evaluation: %d", resp.StatusCode)
}
return io.ReadAll(resp.Body)
} }
...@@ -5,6 +5,7 @@ import "time" ...@@ -5,6 +5,7 @@ import "time"
type Config struct { type Config struct {
HTTP httpConfig HTTP httpConfig
Mongo mongoConfig Mongo mongoConfig
Policy policyConfig
Executor executorConfig Executor executorConfig
LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"`
...@@ -24,6 +25,10 @@ type mongoConfig struct { ...@@ -24,6 +25,10 @@ type mongoConfig struct {
Pass string `envconfig:"MONGO_PASS" required:"true"` Pass string `envconfig:"MONGO_PASS" required:"true"`
} }
type policyConfig struct {
Addr string `envconfig:"POLICY_ADDR" required:"true"`
}
type executorConfig struct { type executorConfig struct {
Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"` Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"`
PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"`
......
...@@ -12,10 +12,12 @@ import ( ...@@ -12,10 +12,12 @@ import (
"code.vereign.com/gaiax/tsa/task/internal/service/task" "code.vereign.com/gaiax/tsa/task/internal/service/task"
) )
// Policy client.
type Policy interface { type Policy interface {
Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error)
} }
// Queue allows retrieving, returning and deleting tasks from storage.
type Queue interface { type Queue interface {
Poll(ctx context.Context) (*task.Task, error) Poll(ctx context.Context) (*task.Task, error)
Ack(ctx context.Context, task *task.Task) error Ack(ctx context.Context, task *task.Task) error
......
...@@ -92,7 +92,7 @@ func (s *Storage) Poll(ctx context.Context) (*task.Task, error) { ...@@ -92,7 +92,7 @@ func (s *Storage) Poll(ctx context.Context) (*task.Task, error) {
return &task, nil return &task, nil
} }
// Ack removes a task from the tasks collection. // Ack removes a task from the `tasks` collection.
func (s *Storage) Ack(ctx context.Context, task *task.Task) error { func (s *Storage) Ack(ctx context.Context, task *task.Task) error {
_, err := s.tasks.DeleteOne(ctx, bson.M{"id": task.ID}) _, err := s.tasks.DeleteOne(ctx, bson.M{"id": task.ID})
return err return err
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment