Skip to content
Snippets Groups Projects
executor.go 2.26 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    package executor
    
    import (
    	"context"
    	"net/http"
    	"sync"
    	"time"
    
    	"go.uber.org/zap"
    
    	"code.vereign.com/gaiax/tsa/golib/errors"
    	"code.vereign.com/gaiax/tsa/task/internal/service/task"
    )
    
    
    // Policy client.
    
    type Policy interface {
    	Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error)
    }
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    
    
    // Queue allows retrieving, returning and deleting tasks from storage.
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    type Queue interface {
    	Poll(ctx context.Context) (*task.Task, error)
    	Ack(ctx context.Context, task *task.Task) error
    	Unack(ctx context.Context, task *task.Task) error
    }
    
    
    type Storage interface {
    	SaveTaskHistory(ctx context.Context, task *task.Task) error
    }
    
    
    type Cache interface {
    	Set(ctx context.Context, key, namespace, scope string, value []byte) error
    	Get(ctx context.Context, key, namespace, scope string) ([]byte, error)
    }
    
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    type Executor struct {
    
    	queue          Queue
    	policy         Policy
    	storage        Storage
    	cache          Cache
    	workers        int
    	pollInterval   time.Duration
    	maxTaskRetries int
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    
    	httpClient *http.Client
    	logger     *zap.Logger
    }
    
    
    func New(
    	queue Queue,
    	policy Policy,
    
    	cache Cache,
    
    	workers int,
    	pollInterval time.Duration,
    
    	maxTaskRetries int,
    
    	httpClient *http.Client,
    	logger *zap.Logger,
    ) *Executor {
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    	return &Executor{
    
    		queue:          queue,
    		policy:         policy,
    		storage:        storage,
    		cache:          cache,
    		workers:        workers,
    		pollInterval:   pollInterval,
    		maxTaskRetries: maxTaskRetries,
    		httpClient:     httpClient,
    		logger:         logger,
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    	}
    }
    
    
    func (e *Executor) Start(ctx context.Context) error {
    	defer e.logger.Info("task executor stopped")
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    
    	var wg sync.WaitGroup
    
    	tasks := make(chan *task.Task)
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    	for i := 0; i < e.workers; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    
    			worker := newWorker(tasks, e.queue, e.policy, e.storage, e.cache, e.maxTaskRetries, e.httpClient, e.logger)
    
    			worker.Start(ctx)
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    		}()
    	}
    
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    	for {
    		select {
    		case <-ctx.Done():
    
    			break loop
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    		case <-time.After(e.pollInterval):
    
    			t, err := e.queue.Poll(ctx)
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    			if err != nil {
    				if !errors.Is(errors.NotFound, err) {
    					e.logger.Error("error getting task from queue", zap.Error(err))
    				}
    				continue
    			}
    
    			tasks <- t // send task to the workers for execution
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    		}
    	}
    
    
    	wg.Wait() // wait all workers to stop
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    
    
    	return ctx.Err()
    
    Lyuben Penkovski's avatar
    Lyuben Penkovski committed
    }