Skip to content
Snippets Groups Projects
executor.go 1.74 KiB
Newer Older
  • Learn to ignore specific revisions
  • package executor
    
    import (
    	"context"
    	"net/http"
    	"sync"
    	"time"
    
    	"go.uber.org/zap"
    
    	"code.vereign.com/gaiax/tsa/golib/errors"
    	"code.vereign.com/gaiax/tsa/task/internal/service/task"
    )
    
    
    // Policy client.
    
    type Policy interface {
    	Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error)
    }
    
    
    // Queue allows retrieving, returning and deleting tasks from storage.
    
    type Queue interface {
    	Poll(ctx context.Context) (*task.Task, error)
    	Ack(ctx context.Context, task *task.Task) error
    	Unack(ctx context.Context, task *task.Task) error
    }
    
    type Executor struct {
    	queue        Queue
    	policy       Policy
    	workers      int
    	pollInterval time.Duration
    
    	httpClient *http.Client
    	logger     *zap.Logger
    }
    
    func New(
    	queue Queue,
    	policy Policy,
    	workers int,
    	pollInterval time.Duration,
    	httpClient *http.Client,
    	logger *zap.Logger,
    ) *Executor {
    	return &Executor{
    		queue:        queue,
    		policy:       policy,
    		workers:      workers,
    		pollInterval: pollInterval,
    		httpClient:   httpClient,
    		logger:       logger,
    	}
    }
    
    func (e *Executor) Start(ctx context.Context) error {
    	defer e.logger.Info("task executor stopped")
    
    	var wg sync.WaitGroup
    	tasks := make(chan *task.Task)
    	for i := 0; i < e.workers; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			worker := newWorker(tasks, e.queue, e.policy, e.httpClient, e.logger)
    			worker.Start(ctx)
    		}()
    	}
    
    loop:
    	for {
    		select {
    		case <-ctx.Done():
    			break loop
    		case <-time.After(e.pollInterval):
    			t, err := e.queue.Poll(ctx)
    			if err != nil {
    				if !errors.Is(errors.NotFound, err) {
    					e.logger.Error("error getting task from queue", zap.Error(err))
    				}
    				continue
    			}
    			tasks <- t // send task to the workers for execution
    		}
    	}
    
    	wg.Wait() // wait all workers to stop
    
    	return ctx.Err()
    }