diff --git a/cmd/task/main.go b/cmd/task/main.go index 0e879c569aaa2eaa205d8c4176547c24071a670f..324743a9f3b9a19795f86582e14f9cae79c8e24d 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log" + "net" "net/http" "time" @@ -24,6 +25,7 @@ import ( "code.vereign.com/gaiax/tsa/task/gen/openapi" goatask "code.vereign.com/gaiax/tsa/task/gen/task" "code.vereign.com/gaiax/tsa/task/internal/config" + "code.vereign.com/gaiax/tsa/task/internal/executor" "code.vereign.com/gaiax/tsa/task/internal/service" "code.vereign.com/gaiax/tsa/task/internal/service/health" "code.vereign.com/gaiax/tsa/task/internal/service/task" @@ -62,6 +64,16 @@ func main() { // create storage storage := storage.New(db) + // create task executor + executor := executor.New( + storage, + nil, + cfg.Executor.Workers, + cfg.Executor.PollInterval, + httpClient(), + logger, + ) + // create services var ( taskSvc goatask.Service @@ -134,6 +146,9 @@ func main() { } return errors.New("server stopped successfully") }) + g.Go(func() error { + return executor.Start(ctx) + }) if err := g.Wait(); err != nil { logger.Error("run group stopped", zap.Error(err)) } @@ -161,3 +176,19 @@ func createLogger(logLevel string, opts ...zap.Option) (*zap.Logger, error) { func errFormatter(e error) goahttp.Statuser { return service.NewErrorResponse(e) } + +func httpClient() *http.Client { + return &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + }).DialContext, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: 60 * time.Second, + }, + Timeout: 30 * time.Second, + } +} diff --git a/internal/clients/cache/client.go b/internal/clients/cache/client.go index 87051977ef8f4943fab4e40496051c873398aad3..b4577a6b24f7c5dfc6a4fdd45db420abaf395657 100644 --- a/internal/clients/cache/client.go +++ b/internal/clients/cache/client.go @@ -2,22 +2,20 @@ package cache import ( "context" - "go.uber.org/zap" + "fmt" "net/http" ) -// Client for Cache service. +// Client for the Cache service. type Client struct { addr string httpClient *http.Client - logger *zap.Logger } func New(addr string, opts ...Option) *Client { c := &Client{ addr: addr, httpClient: http.DefaultClient, - logger: zap.NewNop(), } for _, opt := range opts { @@ -28,9 +26,9 @@ func New(addr string, opts ...Option) *Client { } func (c *Client) Set(ctx context.Context, value []byte) error { - return nil + return fmt.Errorf("not implemented") } func (c *Client) Get(ctx context.Context, key string) ([]byte, error) { - return nil, nil + return nil, fmt.Errorf("not implemented") } diff --git a/internal/clients/cache/option.go b/internal/clients/cache/option.go index 2e9e693d3dfb62924509cc6b368d4cda1a0d295c..10ef93337d96c9319a0d4dff4333024a581fa921 100644 --- a/internal/clients/cache/option.go +++ b/internal/clients/cache/option.go @@ -1,7 +1,6 @@ package cache import ( - "go.uber.org/zap" "net/http" ) @@ -12,9 +11,3 @@ func WithHTTPClient(client *http.Client) Option { c.httpClient = client } } - -func WithLogger(logger *zap.Logger) Option { - return func(c *Client) { - c.logger = logger - } -} diff --git a/internal/clients/policy/client.go b/internal/clients/policy/client.go new file mode 100644 index 0000000000000000000000000000000000000000..c727695766d6b554ab9818a025b716aaa30ee4ce --- /dev/null +++ b/internal/clients/policy/client.go @@ -0,0 +1,20 @@ +package policy + +import ( + "context" + "fmt" +) + +type Client struct { + addr string +} + +func New(addr string) *Client { + return &Client{ + addr: addr, + } +} + +func (c *Client) Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) { + return nil, fmt.Errorf("not implemented") +} diff --git a/internal/config/config.go b/internal/config/config.go index 0d0b375d7984af01183f3a1cd30fa12116a63326..6c07536218553442564a53ae86aae834b4e3a588 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,8 +3,9 @@ package config import "time" type Config struct { - HTTP httpConfig - Mongo mongoConfig + HTTP httpConfig + Mongo mongoConfig + Executor executorConfig LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` } @@ -22,3 +23,8 @@ type mongoConfig struct { User string `envconfig:"MONGO_USER" required:"true"` Pass string `envconfig:"MONGO_PASS" required:"true"` } + +type executorConfig struct { + Workers int `envconfig:"EXECUTOR_WORKERS" default:"5"` + PollInterval time.Duration `envconfig:"EXECUTOR_POLL_INTERVAL" default:"1s"` +} diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 8ac9dfe89be2b20e8ce811916741edc6224845a2..896a59f5e3fc420e72437c0099a22d9a5950ba58 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -1,153 +1,87 @@ -//package executor -// -//import ( -// "bytes" -// "context" -// "io" -// "net/http" -// "sync" -// "time" -// -// "go.uber.org/zap" -// -// "code.vereign.com/gaiax/tsa/golib/errors" -// "code.vereign.com/gaiax/tsa/task/internal/service/task" -//) -// -//const workers = 5 -// -//type Queue interface { -// Poll(ctx context.Context) (*task.Task, error) -// Ack(ctx context.Context, task *task.Task) error -// Unack(ctx context.Context, task *task.Task) error -//} -// -//type Executor struct { -// queue Queue -// workers int -// pollInterval time.Duration -// -// httpClient *http.Client -// logger *zap.Logger -//} -// -//func New() *Executor { -// return &Executor{ -// httpClient: http.DefaultClient, -// } -//} -// -//func (e *Executor) Start(ctx context.Context) { -// if e.workers == 0 { -// e.workers = workers -// } -// -// var wg sync.WaitGroup -// for i := 0; i < e.workers; i++ { -// wg.Add(1) -// go func() { -// defer wg.Done() -// e.Worker(ctx) -// }() -// } -// -// <-ctx.Done() -// wg.Wait() -// -// e.logger.Info("task executor stopped") -//} -// -//func (e *Executor) Worker(ctx context.Context) { -// for { -// select { -// case <-ctx.Done(): -// e.logger.Debug("task execution worker stopped") -// return -// case <-time.After(e.pollInterval): -// task, err := e.queue.Poll(ctx) -// if err != nil { -// if !errors.Is(errors.NotFound, err) { -// e.logger.Error("error getting task from queue", zap.Error(err)) -// } -// continue -// } -// -// logger := e.logger.With( -// zap.String("taskID", task.ID), -// zap.String("taskName", task.Name), -// ) -// -// executedTask, err := e.Execute(ctx, task) -// if err != nil { -// logger.Error("error executing task", zap.Error(err)) -// if err := e.queue.Unack(ctx, task); err != nil { -// logger.Error("failed to unack task in queue", zap.Error(err)) -// } -// continue -// } -// -// if len(executedTask.CacheKey) > 0 { -// if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil { -// // TODO(penkovski): should task execution repeat if result cannot be cached? -// logger.Error("failed to write task result in cache", zap.Error(err)) -// } -// } -// -// if err := e.saveTaskHistory(ctx, executedTask); err != nil { -// logger.Error("failed to save task in history", zap.Error(err)) -// } -// } -// } -//} -// -//func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) { -// // TODO: evaluate request policy -// t.StartedAt = time.Now() -// status, response, err := e.execute(ctx, t) -// if err != nil { -// return nil, err -// } -// -// t.State = task.Done -// t.ResponseCode = status -// t.Response = response -// -// if t.ResponsePolicy != "" { -// // TODO: evaluate response policy -// } -// -// if t.FinalPolicy != "" { -// // TODO: evaluate finalizer policy -// } -// -// t.FinishedAt = time.Now() -// return t, nil -//} -// -//func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) { -// req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request)) -// if err != nil { -// return 0, nil, errors.New("error creating http request", err) -// } -// -// resp, err := e.httpClient.Do(req) -// if err != nil { -// return 0, nil, errors.New("error executing http request", err) -// } -// defer resp.Body.Close() -// -// response, err = io.ReadAll(resp.Body) -// if err != nil { -// return 0, nil, errors.New("error reading response body", err) -// } -// -// return resp.StatusCode, response, nil -//} -// -//func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error { -// return errors.New("not implemented") -//} -// -//func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error { -// return errors.New("not implemented") -//} +package executor + +import ( + "context" + "net/http" + "sync" + "time" + + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +type Policy interface { + Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error) +} + +type Queue interface { + Poll(ctx context.Context) (*task.Task, error) + Ack(ctx context.Context, task *task.Task) error + Unack(ctx context.Context, task *task.Task) error +} + +type Executor struct { + queue Queue + policy Policy + workers int + pollInterval time.Duration + + httpClient *http.Client + logger *zap.Logger +} + +func New( + queue Queue, + policy Policy, + workers int, + pollInterval time.Duration, + httpClient *http.Client, + logger *zap.Logger, +) *Executor { + return &Executor{ + queue: queue, + policy: policy, + workers: workers, + pollInterval: pollInterval, + httpClient: httpClient, + logger: logger, + } +} + +func (e *Executor) Start(ctx context.Context) error { + defer e.logger.Info("task executor stopped") + + var wg sync.WaitGroup + tasks := make(chan *task.Task) + for i := 0; i < e.workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + worker := newWorker(tasks, e.queue, e.policy, e.httpClient, e.logger) + worker.Start(ctx) + }() + } + +loop: + for { + select { + case <-ctx.Done(): + break loop + case <-time.After(e.pollInterval): + t, err := e.queue.Poll(ctx) + if err != nil { + if !errors.Is(errors.NotFound, err) { + e.logger.Error("error getting task from queue", zap.Error(err)) + } + continue + } + tasks <- t // send task to the workers for execution + } + } + + wg.Wait() // wait all workers to stop + + return ctx.Err() +} diff --git a/internal/executor/executor2.go b/internal/executor/executor2.go deleted file mode 100644 index ec7f63e80ac5fefd17e0ca4335c1fcbb7c97532f..0000000000000000000000000000000000000000 --- a/internal/executor/executor2.go +++ /dev/null @@ -1,74 +0,0 @@ -package executor - -import ( - "context" - "net/http" - "sync" - "time" - - "go.uber.org/zap" - - "code.vereign.com/gaiax/tsa/golib/errors" - "code.vereign.com/gaiax/tsa/task/internal/service/task" -) - -const workers = 5 - -type Queue interface { - Poll(ctx context.Context) (*task.Task, error) - Ack(ctx context.Context, task *task.Task) error - Unack(ctx context.Context, task *task.Task) error -} - -type Executor struct { - queue Queue - workers int - pollInterval time.Duration - - httpClient *http.Client - logger *zap.Logger -} - -func New() *Executor { - return &Executor{} -} - -func (e *Executor) Start(ctx context.Context) error { - defer e.logger.Info("task executor stopped") - - if e.workers == 0 { - e.workers = workers - } - - var wg sync.WaitGroup - tasks := make(chan *task.Task) - for i := 0; i < e.workers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - worker := newWorker(tasks, e.queue, e.httpClient, e.logger) - worker.Start(ctx) - }() - } - -loop: - for { - select { - case <-ctx.Done(): - break loop - case <-time.After(e.pollInterval): - t, err := e.queue.Poll(ctx) - if err != nil { - if !errors.Is(errors.NotFound, err) { - e.logger.Error("error getting task from queue", zap.Error(err)) - } - continue - } - tasks <- t // send task to the workers for execution - } - } - - wg.Wait() // wait all workers to stop - - return ctx.Err() -} diff --git a/internal/executor/worker.go b/internal/executor/worker.go index 65c9246905ae2d5b8ace5ccf979a5c3d130cd049..9f664c269bc14615e00145b09b000868f22b95d9 100644 --- a/internal/executor/worker.go +++ b/internal/executor/worker.go @@ -1,26 +1,32 @@ package executor import ( + "bytes" "context" "fmt" + "io" "net/http" + "time" "go.uber.org/zap" - "code.vereign.com/gaiax/tsa/task/internal/service/task" + "code.vereign.com/gaiax/tsa/golib/errors" + taskpkg "code.vereign.com/gaiax/tsa/task/internal/service/task" ) type Worker struct { - tasks chan *task.Task + tasks chan *taskpkg.Task queue Queue + policy Policy httpClient *http.Client logger *zap.Logger } -func newWorker(tasks chan *task.Task, queue Queue, httpClient *http.Client, logger *zap.Logger) *Worker { +func newWorker(tasks chan *taskpkg.Task, queue Queue, policy Policy, httpClient *http.Client, logger *zap.Logger) *Worker { return &Worker{ tasks: tasks, queue: queue, + policy: policy, httpClient: httpClient, logger: logger, } @@ -39,7 +45,7 @@ func (w *Worker) Start(ctx context.Context) { zap.String("taskName", t.Name), ) - executed, err := w.execute(ctx, t) + executed, err := w.Execute(ctx, t) if err != nil { logger.Error("error executing task", zap.Error(err)) if err := w.queue.Unack(ctx, t); err != nil { @@ -48,28 +54,83 @@ func (w *Worker) Start(ctx context.Context) { continue } - if err := w.cacheResult(ctx, executed); err != nil { - // TODO: should we remove the task from queue or it should be retried? - logger.Error("error caching task result", zap.Error(err)) - continue - } - if err := w.saveTaskHistory(ctx, executed); err != nil { logger.Error("error saving task history", zap.Error(err)) continue } + + // remove task from queue + if err := w.queue.Ack(ctx, executed); err != nil { + logger.Error("failed to ack task in queue", zap.Error(err)) + } } } } -func (w *Worker) execute(ctx context.Context, task *task.Task) (*task.Task, error) { - return nil, fmt.Errorf("not implemented") +func (w *Worker) Execute(ctx context.Context, task *taskpkg.Task) (*taskpkg.Task, error) { + task.StartedAt = time.Now() + + // evaluate request policy + if task.RequestPolicy != "" { + resp, err := w.policy.Evaluate(ctx, task.RequestPolicy, task.Request) + if err != nil { + return nil, errors.New("error evaluating request policy", err) + } + // overwrite task request with the one returned from the policy + task.Request = resp + } + + status, response, err := w.do(ctx, task) + if err != nil { + return nil, err + } + + task.State = taskpkg.Done + task.ResponseCode = status + task.Response = response + + // evaluate response policy + if task.ResponsePolicy != "" { + resp, err := w.policy.Evaluate(ctx, task.ResponsePolicy, task.Response) + if err != nil { + return nil, errors.New("error evaluating response policy", err) + } + // overwrite task response with the one returned from the policy + task.Response = resp + } + + // evaluate finalizer policy + if task.FinalPolicy != "" { + if _, err := w.policy.Evaluate(ctx, task.FinalPolicy, task.Response); err != nil { + return nil, errors.New("error evaluating final policy", err) + } + } + + task.FinishedAt = time.Now() + return task, nil + } -func (w *Worker) cacheResult(ctx context.Context, task *task.Task) error { - return fmt.Errorf("not implemented") +func (w *Worker) do(ctx context.Context, task *taskpkg.Task) (status int, response []byte, err error) { + req, err := http.NewRequest(task.Method, task.URL, bytes.NewReader(task.Request)) + if err != nil { + return 0, nil, errors.New("error creating http request", err) + } + + resp, err := w.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return 0, nil, errors.New("error executing http request", err) + } + defer resp.Body.Close() + + response, err = io.ReadAll(resp.Body) + if err != nil { + return 0, nil, errors.New("error reading response body", err) + } + + return resp.StatusCode, response, nil } -func (w *Worker) saveTaskHistory(ctx context.Context, task *task.Task) error { +func (w *Worker) saveTaskHistory(ctx context.Context, task *taskpkg.Task) error { return fmt.Errorf("not implemented") } diff --git a/internal/service/task/task.go b/internal/service/task/task.go index 4a2a677974cbf2cf08d6143fdb59679ad8fd2869..78a25a4d8e6b10182d71fb0c9e9832e224a1b3bf 100644 --- a/internal/service/task/task.go +++ b/internal/service/task/task.go @@ -1,6 +1,8 @@ package task -import "time" +import ( + "time" +) type State string diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 09bb73956b7dbe7308121ff9e890ecf1354fc9bb..049de8d4237ccaccb4397211b9fc354556ab99c9 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -16,17 +16,20 @@ const ( taskDB = "task" taskTemplates = "taskTemplates" taskQueue = "tasks" + tasksHistory = "tasksHistory" ) type Storage struct { - templates *mongo.Collection - tasks *mongo.Collection + templates *mongo.Collection + tasks *mongo.Collection + tasksHistory *mongo.Collection } func New(db *mongo.Client) *Storage { return &Storage{ - templates: db.Database(taskDB).Collection(taskTemplates), - tasks: db.Database(taskDB).Collection(taskQueue), + templates: db.Database(taskDB).Collection(taskTemplates), + tasks: db.Database(taskDB).Collection(taskQueue), + tasksHistory: db.Database(taskDB).Collection(tasksHistory), } }