Skip to content
Snippets Groups Projects
executor.go 3.64 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    //package executor
    //
    //import (
    //	"bytes"
    //	"context"
    //	"io"
    //	"net/http"
    //	"sync"
    //	"time"
    //
    //	"go.uber.org/zap"
    //
    //	"code.vereign.com/gaiax/tsa/golib/errors"
    //	"code.vereign.com/gaiax/tsa/task/internal/service/task"
    //)
    //
    //const workers = 5
    //
    //type Queue interface {
    //	Poll(ctx context.Context) (*task.Task, error)
    //	Ack(ctx context.Context, task *task.Task) error
    //	Unack(ctx context.Context, task *task.Task) error
    //}
    //
    //type Executor struct {
    //	queue        Queue
    //	workers      int
    //	pollInterval time.Duration
    //
    //	httpClient *http.Client
    //	logger     *zap.Logger
    //}
    //
    //func New() *Executor {
    //	return &Executor{
    //		httpClient: http.DefaultClient,
    //	}
    //}
    //
    //func (e *Executor) Start(ctx context.Context) {
    //	if e.workers == 0 {
    //		e.workers = workers
    //	}
    //
    //	var wg sync.WaitGroup
    //	for i := 0; i < e.workers; i++ {
    //		wg.Add(1)
    //		go func() {
    //			defer wg.Done()
    //			e.Worker(ctx)
    //		}()
    //	}
    //
    //	<-ctx.Done()
    //	wg.Wait()
    //
    //	e.logger.Info("task executor stopped")
    //}
    //
    //func (e *Executor) Worker(ctx context.Context) {
    //	for {
    //		select {
    //		case <-ctx.Done():
    //			e.logger.Debug("task execution worker stopped")
    //			return
    //		case <-time.After(e.pollInterval):
    //			task, err := e.queue.Poll(ctx)
    //			if err != nil {
    //				if !errors.Is(errors.NotFound, err) {
    //					e.logger.Error("error getting task from queue", zap.Error(err))
    //				}
    //				continue
    //			}
    //
    //			logger := e.logger.With(
    //				zap.String("taskID", task.ID),
    //				zap.String("taskName", task.Name),
    //			)
    //
    //			executedTask, err := e.Execute(ctx, task)
    //			if err != nil {
    //				logger.Error("error executing task", zap.Error(err))
    //				if err := e.queue.Unack(ctx, task); err != nil {
    //					logger.Error("failed to unack task in queue", zap.Error(err))
    //				}
    //				continue
    //			}
    //
    //			if len(executedTask.CacheKey) > 0 {
    //				if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil {
    //					// TODO(penkovski): should task execution repeat if result cannot be cached?
    //					logger.Error("failed to write task result in cache", zap.Error(err))
    //				}
    //			}
    //
    //			if err := e.saveTaskHistory(ctx, executedTask); err != nil {
    //				logger.Error("failed to save task in history", zap.Error(err))
    //			}
    //		}
    //	}
    //}
    //
    //func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) {
    //	// TODO: evaluate request policy
    //	t.StartedAt = time.Now()
    //	status, response, err := e.execute(ctx, t)
    //	if err != nil {
    //		return nil, err
    //	}
    //
    //	t.State = task.Done
    //	t.ResponseCode = status
    //	t.Response = response
    //
    //	if t.ResponsePolicy != "" {
    //		// TODO: evaluate response policy
    //	}
    //
    //	if t.FinalPolicy != "" {
    //		// TODO: evaluate finalizer policy
    //	}
    //
    //	t.FinishedAt = time.Now()
    //	return t, nil
    //}
    //
    //func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) {
    //	req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request))
    //	if err != nil {
    //		return 0, nil, errors.New("error creating http request", err)
    //	}
    //
    //	resp, err := e.httpClient.Do(req)
    //	if err != nil {
    //		return 0, nil, errors.New("error executing http request", err)
    //	}
    //	defer resp.Body.Close()
    //
    //	response, err = io.ReadAll(resp.Body)
    //	if err != nil {
    //		return 0, nil, errors.New("error reading response body", err)
    //	}
    //
    //	return resp.StatusCode, response, nil
    //}
    //
    //func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error {
    //	return errors.New("not implemented")
    //}
    //
    //func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error {
    //	return errors.New("not implemented")
    //}