diff --git a/internal/clients/cache/client.go b/internal/clients/cache/client.go new file mode 100644 index 0000000000000000000000000000000000000000..87051977ef8f4943fab4e40496051c873398aad3 --- /dev/null +++ b/internal/clients/cache/client.go @@ -0,0 +1,36 @@ +package cache + +import ( + "context" + "go.uber.org/zap" + "net/http" +) + +// Client for Cache service. +type Client struct { + addr string + httpClient *http.Client + logger *zap.Logger +} + +func New(addr string, opts ...Option) *Client { + c := &Client{ + addr: addr, + httpClient: http.DefaultClient, + logger: zap.NewNop(), + } + + for _, opt := range opts { + opt(c) + } + + return c +} + +func (c *Client) Set(ctx context.Context, value []byte) error { + return nil +} + +func (c *Client) Get(ctx context.Context, key string) ([]byte, error) { + return nil, nil +} diff --git a/internal/clients/cache/option.go b/internal/clients/cache/option.go new file mode 100644 index 0000000000000000000000000000000000000000..2e9e693d3dfb62924509cc6b368d4cda1a0d295c --- /dev/null +++ b/internal/clients/cache/option.go @@ -0,0 +1,20 @@ +package cache + +import ( + "go.uber.org/zap" + "net/http" +) + +type Option func(*Client) + +func WithHTTPClient(client *http.Client) Option { + return func(c *Client) { + c.httpClient = client + } +} + +func WithLogger(logger *zap.Logger) Option { + return func(c *Client) { + c.logger = logger + } +} diff --git a/internal/executor/executor.go b/internal/executor/executor.go new file mode 100644 index 0000000000000000000000000000000000000000..288ce76ec69f461fd6b9b2419c4cb6f757609d6c --- /dev/null +++ b/internal/executor/executor.go @@ -0,0 +1,153 @@ +package executor + +import ( + "bytes" + "context" + "io" + "net/http" + "sync" + "time" + + "go.uber.org/zap" + + "code.vereign.com/gaiax/tsa/golib/errors" + "code.vereign.com/gaiax/tsa/task/internal/service/task" +) + +const workers = 5 + +type Queue interface { + Poll(ctx context.Context) (*task.Task, error) + Ack(ctx context.Context, task *task.Task) error + Unack(ctx context.Context, task *task.Task) error +} + +type Executor struct { + queue Queue + workers int + pollInterval time.Duration + + httpClient *http.Client + logger *zap.Logger +} + +func New() *Executor { + return &Executor{ + httpClient: http.DefaultClient, + } +} + +func (e *Executor) Start(ctx context.Context) { + if e.workers == 0 { + e.workers = workers + } + + var wg sync.WaitGroup + for i := 0; i < e.workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + e.Worker(ctx) + }() + } + + <-ctx.Done() + wg.Wait() + + e.logger.Info("task executor stopped") +} + +func (e *Executor) Worker(ctx context.Context) { + for { + select { + case <-ctx.Done(): + e.logger.Debug("task execution worker stopped") + return + case <-time.After(e.pollInterval): + task, err := e.queue.Poll(ctx) + if err != nil { + if !errors.Is(errors.NotFound, err) { + e.logger.Error("error getting task from queue", zap.Error(err)) + } + continue + } + + logger := e.logger.With( + zap.String("taskID", task.ID), + zap.String("taskName", task.Name), + ) + + executedTask, err := e.Execute(ctx, task) + if err != nil { + logger.Error("error executing task", zap.Error(err)) + if err := e.queue.Unack(ctx, task); err != nil { + logger.Error("failed to unack task in queue", zap.Error(err)) + } + continue + } + + if len(executedTask.CacheKey) > 0 { + if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil { + // TODO(penkovski): should task execution repeat if result cannot be cached? + logger.Error("failed to write task result in cache", zap.Error(err)) + } + } + + if err := e.saveTaskHistory(ctx, executedTask); err != nil { + logger.Error("failed to save task in history", zap.Error(err)) + } + } + } +} + +func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) { + // TODO: evaluate request policy + t.StartedAt = time.Now() + status, response, err := e.execute(ctx, t) + if err != nil { + return nil, err + } + + t.State = task.Done + t.ResponseCode = status + t.Response = response + + if t.ResponsePolicy != "" { + // TODO: evaluate response policy + } + + if t.FinalPolicy != "" { + // TODO: evaluate finalizer policy + } + + t.FinishedAt = time.Now() + return t, nil +} + +func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) { + req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request)) + if err != nil { + return 0, nil, errors.New("error creating http request", err) + } + + resp, err := e.httpClient.Do(req) + if err != nil { + return 0, nil, errors.New("error executing http request", err) + } + defer resp.Body.Close() + + response, err = io.ReadAll(resp.Body) + if err != nil { + return 0, nil, errors.New("error reading response body", err) + } + + return resp.StatusCode, response, nil +} + +func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error { + return errors.New("not implemented") +} + +func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error { + return errors.New("not implemented") +}