Skip to content
Snippets Groups Projects
Commit 9a50f69c authored by Lyuben Penkovski's avatar Lyuben Penkovski
Browse files

WIP: ...

parent 75d4000f
No related branches found
No related tags found
1 merge request!4Task execution of independent tasks
package cache
import (
"context"
"go.uber.org/zap"
"net/http"
)
// Client for Cache service.
type Client struct {
addr string
httpClient *http.Client
logger *zap.Logger
}
func New(addr string, opts ...Option) *Client {
c := &Client{
addr: addr,
httpClient: http.DefaultClient,
logger: zap.NewNop(),
}
for _, opt := range opts {
opt(c)
}
return c
}
func (c *Client) Set(ctx context.Context, value []byte) error {
return nil
}
func (c *Client) Get(ctx context.Context, key string) ([]byte, error) {
return nil, nil
}
package cache
import (
"go.uber.org/zap"
"net/http"
)
type Option func(*Client)
func WithHTTPClient(client *http.Client) Option {
return func(c *Client) {
c.httpClient = client
}
}
func WithLogger(logger *zap.Logger) Option {
return func(c *Client) {
c.logger = logger
}
}
package executor
import (
"bytes"
"context"
"io"
"net/http"
"sync"
"time"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/golib/errors"
"code.vereign.com/gaiax/tsa/task/internal/service/task"
)
const workers = 5
type Queue interface {
Poll(ctx context.Context) (*task.Task, error)
Ack(ctx context.Context, task *task.Task) error
Unack(ctx context.Context, task *task.Task) error
}
type Executor struct {
queue Queue
workers int
pollInterval time.Duration
httpClient *http.Client
logger *zap.Logger
}
func New() *Executor {
return &Executor{
httpClient: http.DefaultClient,
}
}
func (e *Executor) Start(ctx context.Context) {
if e.workers == 0 {
e.workers = workers
}
var wg sync.WaitGroup
for i := 0; i < e.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
e.Worker(ctx)
}()
}
<-ctx.Done()
wg.Wait()
e.logger.Info("task executor stopped")
}
func (e *Executor) Worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
e.logger.Debug("task execution worker stopped")
return
case <-time.After(e.pollInterval):
task, err := e.queue.Poll(ctx)
if err != nil {
if !errors.Is(errors.NotFound, err) {
e.logger.Error("error getting task from queue", zap.Error(err))
}
continue
}
logger := e.logger.With(
zap.String("taskID", task.ID),
zap.String("taskName", task.Name),
)
executedTask, err := e.Execute(ctx, task)
if err != nil {
logger.Error("error executing task", zap.Error(err))
if err := e.queue.Unack(ctx, task); err != nil {
logger.Error("failed to unack task in queue", zap.Error(err))
}
continue
}
if len(executedTask.CacheKey) > 0 {
if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil {
// TODO(penkovski): should task execution repeat if result cannot be cached?
logger.Error("failed to write task result in cache", zap.Error(err))
}
}
if err := e.saveTaskHistory(ctx, executedTask); err != nil {
logger.Error("failed to save task in history", zap.Error(err))
}
}
}
}
func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) {
// TODO: evaluate request policy
t.StartedAt = time.Now()
status, response, err := e.execute(ctx, t)
if err != nil {
return nil, err
}
t.State = task.Done
t.ResponseCode = status
t.Response = response
if t.ResponsePolicy != "" {
// TODO: evaluate response policy
}
if t.FinalPolicy != "" {
// TODO: evaluate finalizer policy
}
t.FinishedAt = time.Now()
return t, nil
}
func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) {
req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request))
if err != nil {
return 0, nil, errors.New("error creating http request", err)
}
resp, err := e.httpClient.Do(req)
if err != nil {
return 0, nil, errors.New("error executing http request", err)
}
defer resp.Body.Close()
response, err = io.ReadAll(resp.Body)
if err != nil {
return 0, nil, errors.New("error reading response body", err)
}
return resp.StatusCode, response, nil
}
func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error {
return errors.New("not implemented")
}
func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error {
return errors.New("not implemented")
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment