Skip to content
Snippets Groups Projects
executor.go 3.34 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    package executor
    
    import (
    	"bytes"
    	"context"
    	"io"
    	"net/http"
    	"sync"
    	"time"
    
    	"go.uber.org/zap"
    
    	"code.vereign.com/gaiax/tsa/golib/errors"
    	"code.vereign.com/gaiax/tsa/task/internal/service/task"
    )
    
    const workers = 5
    
    type Queue interface {
    	Poll(ctx context.Context) (*task.Task, error)
    	Ack(ctx context.Context, task *task.Task) error
    	Unack(ctx context.Context, task *task.Task) error
    }
    
    type Executor struct {
    	queue        Queue
    	workers      int
    	pollInterval time.Duration
    
    	httpClient *http.Client
    	logger     *zap.Logger
    }
    
    func New() *Executor {
    	return &Executor{
    		httpClient: http.DefaultClient,
    	}
    }
    
    func (e *Executor) Start(ctx context.Context) {
    	if e.workers == 0 {
    		e.workers = workers
    	}
    
    	var wg sync.WaitGroup
    	for i := 0; i < e.workers; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			e.Worker(ctx)
    		}()
    	}
    
    	<-ctx.Done()
    	wg.Wait()
    
    	e.logger.Info("task executor stopped")
    }
    
    func (e *Executor) Worker(ctx context.Context) {
    	for {
    		select {
    		case <-ctx.Done():
    			e.logger.Debug("task execution worker stopped")
    			return
    		case <-time.After(e.pollInterval):
    			task, err := e.queue.Poll(ctx)
    			if err != nil {
    				if !errors.Is(errors.NotFound, err) {
    					e.logger.Error("error getting task from queue", zap.Error(err))
    				}
    				continue
    			}
    
    			logger := e.logger.With(
    				zap.String("taskID", task.ID),
    				zap.String("taskName", task.Name),
    			)
    
    			executedTask, err := e.Execute(ctx, task)
    			if err != nil {
    				logger.Error("error executing task", zap.Error(err))
    				if err := e.queue.Unack(ctx, task); err != nil {
    					logger.Error("failed to unack task in queue", zap.Error(err))
    				}
    				continue
    			}
    
    			if len(executedTask.CacheKey) > 0 {
    				if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil {
    					// TODO(penkovski): should task execution repeat if result cannot be cached?
    					logger.Error("failed to write task result in cache", zap.Error(err))
    				}
    			}
    
    			if err := e.saveTaskHistory(ctx, executedTask); err != nil {
    				logger.Error("failed to save task in history", zap.Error(err))
    			}
    		}
    	}
    }
    
    func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) {
    	// TODO: evaluate request policy
    	t.StartedAt = time.Now()
    	status, response, err := e.execute(ctx, t)
    	if err != nil {
    		return nil, err
    	}
    
    	t.State = task.Done
    	t.ResponseCode = status
    	t.Response = response
    
    	if t.ResponsePolicy != "" {
    		// TODO: evaluate response policy
    	}
    
    	if t.FinalPolicy != "" {
    		// TODO: evaluate finalizer policy
    	}
    
    	t.FinishedAt = time.Now()
    	return t, nil
    }
    
    func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) {
    	req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request))
    	if err != nil {
    		return 0, nil, errors.New("error creating http request", err)
    	}
    
    	resp, err := e.httpClient.Do(req)
    	if err != nil {
    		return 0, nil, errors.New("error executing http request", err)
    	}
    	defer resp.Body.Close()
    
    	response, err = io.ReadAll(resp.Body)
    	if err != nil {
    		return 0, nil, errors.New("error reading response body", err)
    	}
    
    	return resp.StatusCode, response, nil
    }
    
    func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error {
    	return errors.New("not implemented")
    }
    
    func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error {
    	return errors.New("not implemented")
    }