diff --git a/cmd/task/main.go b/cmd/task/main.go index 0e879c569aaa2eaa205d8c4176547c24071a670f..324743a9f3b9a19795f86582e14f9cae79c8e24d 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log" + "net" "net/http" "time" @@ -24,6 +25,7 @@ import ( "code.vereign.com/gaiax/tsa/task/gen/openapi" goatask "code.vereign.com/gaiax/tsa/task/gen/task" "code.vereign.com/gaiax/tsa/task/internal/config" + "code.vereign.com/gaiax/tsa/task/internal/executor" "code.vereign.com/gaiax/tsa/task/internal/service" "code.vereign.com/gaiax/tsa/task/internal/service/health" "code.vereign.com/gaiax/tsa/task/internal/service/task" @@ -62,6 +64,16 @@ func main() { // create storage storage := storage.New(db) + // create task executor + executor := executor.New( + storage, + nil, + cfg.Executor.Workers, + cfg.Executor.PollInterval, + httpClient(), + logger, + ) + // create services var ( taskSvc goatask.Service @@ -134,6 +146,9 @@ func main() { } return errors.New("server stopped successfully") }) + g.Go(func() error { + return executor.Start(ctx) + }) if err := g.Wait(); err != nil { logger.Error("run group stopped", zap.Error(err)) } @@ -161,3 +176,19 @@ func createLogger(logLevel string, opts ...zap.Option) (*zap.Logger, error) { func errFormatter(e error) goahttp.Statuser { return service.NewErrorResponse(e) } + +func httpClient() *http.Client { + return &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + }).DialContext, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: 60 * time.Second, + }, + Timeout: 30 * time.Second, + } +} diff --git a/internal/clients/cache/client.go b/internal/clients/cache/client.go index 87051977ef8f4943fab4e40496051c873398aad3..b4577a6b24f7c5dfc6a4fdd45db420abaf395657 100644 --- a/internal/clients/cache/client.go +++ b/internal/clients/cache/client.go @@ -2,22 +2,20 @@ package cache import ( "context" - "go.uber.org/zap" + "fmt" "net/http" ) -// Client for Cache service. +// Client for the Cache service. type Client struct { addr string httpClient *http.Client - logger *zap.Logger } func New(addr string, opts ...Option) *Client { c := &Client{ addr: addr, httpClient: http.DefaultClient, - logger: zap.NewNop(), } for _, opt := range opts { @@ -28,9 +26,9 @@ func New(addr string, opts ...Option) *Client { } func (c *Client) Set(ctx context.Context, value []byte) error { - return nil + return fmt.Errorf("not implemented") } func (c *Client) Get(ctx context.Context, key string) ([]byte, error) { - return nil, nil + return nil, fmt.Errorf("not implemented") } diff --git a/internal/clients/cache/option.go b/internal/clients/cache/option.go index 2e9e693d3dfb62924509cc6b368d4cda1a0d295c..10ef93337d96c9319a0d4dff4333024a581fa921 100644 --- a/internal/clients/cache/option.go +++ b/internal/clients/cache/option.go @@ -1,7 +1,6 @@ package cache import ( - "go.uber.org/zap" "net/http" ) @@ -12,9 +11,3 @@ func WithHTTPClient(client *http.Client) Option { c.httpClient = client } } - -func WithLogger(logger *zap.Logger) Option { - return func(c *Client) { - c.logger = logger - } -} diff --git a/internal/clients/policy/client.go b/internal/clients/policy/client.go new file mode 100644 index 0000000000000000000000000000000000000000..c727695766d6b554ab9818a025b716aaa30ee4ce --- /dev/null +++ b/internal/clients/policy/client.go @@ -0,0 +1,20 @@ +package policy + +import ( + "context" + "fmt" +) + +type Client struct { + addr string +} + +func New(addr string) *Client { + return &Client{ + addr: addr, + } +} + +func (c *Client) Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) { + return nil, fmt.Errorf("not implemented") +} diff --git a/internal/config/config.go b/internal/config/config.go index 0d0b375d7984af01183f3a1cd30fa12116a63326..6c07536218553442564a53ae86aae834b4e3a588 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,8 +3,9 @@ package config import "time" type Config struct { - HTTP httpConfig - Mongo mongoConfig + HTTP httpConfig + Mongo mongoConfig + Executor executorConfig LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` } @@ -22,3 +23,8 @@ type mongoConfig struct { User string `envconfig:"MONGO_USER" required:"true"` Pass string `envconfig:"MONGO_PASS" required:"true"` } + +type executorConfig struct { + Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"` + PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` +} diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 288ce76ec69f461fd6b9b2419c4cb6f757609d6c..896a59f5e3fc420e72437c0099a22d9a5950ba58 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -1,9 +1,7 @@ package executor import ( - "bytes" "context" - "io" "net/http" "sync" "time" @@ -14,7 +12,9 @@ import ( "code.vereign.com/gaiax/tsa/task/internal/service/task" ) -const workers = 5 +type Policy interface { + Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) +} type Queue interface { Poll(ctx context.Context) (*task.Task, error) @@ -24,6 +24,7 @@ type Queue interface { type Executor struct { queue Queue + policy Policy workers int pollInterval time.Duration @@ -31,123 +32,56 @@ type Executor struct { logger *zap.Logger } -func New() *Executor { +func New( + queue Queue, + policy Policy, + workers int, + pollInterval time.Duration, + httpClient *http.Client, + logger *zap.Logger, +) *Executor { return &Executor{ - httpClient: http.DefaultClient, + queue: queue, + policy: policy, + workers: workers, + pollInterval: pollInterval, + httpClient: httpClient, + logger: logger, } } -func (e *Executor) Start(ctx context.Context) { - if e.workers == 0 { - e.workers = workers - } +func (e *Executor) Start(ctx context.Context) error { + defer e.logger.Info("task executor stopped") var wg sync.WaitGroup + tasks := make(chan *task.Task) for i := 0; i < e.workers; i++ { wg.Add(1) go func() { defer wg.Done() - e.Worker(ctx) + worker := newWorker(tasks, e.queue, e.policy, e.httpClient, e.logger) + worker.Start(ctx) }() } - <-ctx.Done() - wg.Wait() - - e.logger.Info("task executor stopped") -} - -func (e *Executor) Worker(ctx context.Context) { +loop: for { select { case <-ctx.Done(): - e.logger.Debug("task execution worker stopped") - return + break loop case <-time.After(e.pollInterval): - task, err := e.queue.Poll(ctx) + t, err := e.queue.Poll(ctx) if err != nil { if !errors.Is(errors.NotFound, err) { e.logger.Error("error getting task from queue", zap.Error(err)) } continue } - - logger := e.logger.With( - zap.String("taskID", task.ID), - zap.String("taskName", task.Name), - ) - - executedTask, err := e.Execute(ctx, task) - if err != nil { - logger.Error("error executing task", zap.Error(err)) - if err := e.queue.Unack(ctx, task); err != nil { - logger.Error("failed to unack task in queue", zap.Error(err)) - } - continue - } - - if len(executedTask.CacheKey) > 0 { - if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil { - // TODO(penkovski): should task execution repeat if result cannot be cached? - logger.Error("failed to write task result in cache", zap.Error(err)) - } - } - - if err := e.saveTaskHistory(ctx, executedTask); err != nil { - logger.Error("failed to save task in history", zap.Error(err)) - } + tasks <- t // send task to the workers for execution } } -} -func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) { - // TODO: evaluate request policy - t.StartedAt = time.Now() - status, response, err := e.execute(ctx, t) - if err != nil { - return nil, err - } - - t.State = task.Done - t.ResponseCode = status - t.Response = response - - if t.ResponsePolicy != "" { - // TODO: evaluate response policy - } - - if t.FinalPolicy != "" { - // TODO: evaluate finalizer policy - } - - t.FinishedAt = time.Now() - return t, nil -} - -func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) { - req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request)) - if err != nil { - return 0, nil, errors.New("error creating http request", err) - } - - resp, err := e.httpClient.Do(req) - if err != nil { - return 0, nil, errors.New("error executing http request", err) - } - defer resp.Body.Close() - - response, err = io.ReadAll(resp.Body) - if err != nil { - return 0, nil, errors.New("error reading response body", err) - } - - return resp.StatusCode, response, nil -} - -func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error { - return errors.New("not implemented") -} + wg.Wait() // wait all workers to stop -func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error { - return errors.New("not implemented") + return ctx.Err() } diff --git a/internal/executor/worker.go b/internal/executor/worker.go new file mode 100644 index 0000000000000000000000000000000000000000..9f664c269bc14615e00145b09b000868f22b95d9 --- /dev/null +++ b/internal/executor/worker.go @@ -0,0 +1,136 @@ +package executor + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "time" + + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + taskpkg "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +type Worker struct { + tasks chan *taskpkg.Task + queue Queue + policy Policy + httpClient *http.Client + logger *zap.Logger +} + +func newWorker(tasks chan *taskpkg.Task, queue Queue, policy Policy, httpClient *http.Client, logger *zap.Logger) *Worker { + return &Worker{ + tasks: tasks, + queue: queue, + policy: policy, + httpClient: httpClient, + logger: logger, + } +} + +func (w *Worker) Start(ctx context.Context) { + defer w.logger.Debug("task worker stopped") + + for { + select { + case <-ctx.Done(): + return + case t := <-w.tasks: + logger := w.logger.With( + zap.String("taskID", t.ID), + zap.String("taskName", t.Name), + ) + + executed, err := w.Execute(ctx, t) + if err != nil { + logger.Error("error executing task", zap.Error(err)) + if err := w.queue.Unack(ctx, t); err != nil { + logger.Error("failed to unack task in queue", zap.Error(err)) + } + continue + } + + if err := w.saveTaskHistory(ctx, executed); err != nil { + logger.Error("error saving task history", zap.Error(err)) + continue + } + + // remove task from queue + if err := w.queue.Ack(ctx, executed); err != nil { + logger.Error("failed to ack task in queue", zap.Error(err)) + } + } + } +} + +func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task, error) { + task.StartedAt = time.Now() + + // evaluate request policy + if task.RequestPolicy != "" { + resp, err := w.policy.Evaluate(ctx, task.RequestPolicy, task.Request) + if err != nil { + return nil, errors.New("error evaluating request policy", err) + } + // overwrite task request with the one returned from the policy + task.Request = resp + } + + status, response, err := w.do(ctx, task) + if err != nil { + return nil, err + } + + task.State = taskpkg.Done + task.ResponseCode = status + task.Response = response + + // evaluate response policy + if task.ResponsePolicy != "" { + resp, err := w.policy.Evaluate(ctx, task.ResponsePolicy, task.Response) + if err != nil { + return nil, errors.New("error evaluating response policy", err) + } + // overwrite task response with the one returned from the policy + task.Response = resp + } + + // evaluate finalizer policy + if task.FinalPolicy != "" { + if _, err := w.policy.Evaluate(ctx, task.FinalPolicy, task.Response); err != nil { + return nil, errors.New("error evaluating final policy", err) + } + } + + task.FinishedAt = time.Now() + return task, nil + +} + +func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) { + req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request)) + if err != nil { + return 0, nil, errors.New("error creating http request", err) + } + + resp, err := w.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return 0, nil, errors.New("error executing http request", err) + } + defer resp.Body.Close() + + response, err = io.ReadAll(resp.Body) + if err != nil { + return 0, nil, errors.New("error reading response body", err) + } + + return resp.StatusCode, response, nil +} + +func (w *Worker) saveTaskHistory(ctx context.Context, task *taskpkg.Task) error { + return fmt.Errorf("not implemented") +} diff --git a/internal/service/task/task.go b/internal/service/task/task.go index 4a2a677974cbf2cf08d6143fdb59679ad8fd2869..78a25a4d8e6b10182d71fb0c9e9832e224a1b3bf 100644 --- a/internal/service/task/task.go +++ b/internal/service/task/task.go @@ -1,6 +1,8 @@ package task -import "time" +import ( + "time" +) type State string diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 09bb73956b7dbe7308121ff9e890ecf1354fc9bb..049de8d4237ccaccb4397211b9fc354556ab99c9 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -16,17 +16,20 @@ const ( taskDB = "task" taskTemplates = "taskTemplates" taskQueue = "tasks" + tasksHistory = "tasksHistory" ) type Storage struct { - templates *mongo.Collection - tasks *mongo.Collection + templates *mongo.Collection + tasks *mongo.Collection + tasksHistory *mongo.Collection } func New(db *mongo.Client) *Storage { return &Storage{ - templates: db.Database(taskDB).Collection(taskTemplates), - tasks: db.Database(taskDB).Collection(taskQueue), + templates: db.Database(taskDB).Collection(taskTemplates), + tasks: db.Database(taskDB).Collection(taskQueue), + tasksHistory: db.Database(taskDB).Collection(tasksHistory), } }