package executor

import (
	"context"
	"net/http"
	"sync"
	"time"

	"go.uber.org/zap"

	"code.vereign.com/gaiax/tsa/golib/errors"
	"code.vereign.com/gaiax/tsa/task/internal/service/task"
)

// Policy client.
type Policy interface {
	Evaluate(ctx context.Context, policy string, data []byte) ([]byte, error)
}

// Queue allows retrieving, returning and deleting tasks from storage.
type Queue interface {
	Poll(ctx context.Context) (*task.Task, error)
	Ack(ctx context.Context, task *task.Task) error
	Unack(ctx context.Context, task *task.Task) error
}

type Executor struct {
	queue        Queue
	policy       Policy
	workers      int
	pollInterval time.Duration

	httpClient *http.Client
	logger     *zap.Logger
}

func New(
	queue Queue,
	policy Policy,
	workers int,
	pollInterval time.Duration,
	httpClient *http.Client,
	logger *zap.Logger,
) *Executor {
	return &Executor{
		queue:        queue,
		policy:       policy,
		workers:      workers,
		pollInterval: pollInterval,
		httpClient:   httpClient,
		logger:       logger,
	}
}

func (e *Executor) Start(ctx context.Context) error {
	defer e.logger.Info("task executor stopped")

	var wg sync.WaitGroup
	tasks := make(chan *task.Task)
	for i := 0; i < e.workers; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			worker := newWorker(tasks, e.queue, e.policy, e.httpClient, e.logger)
			worker.Start(ctx)
		}()
	}

loop:
	for {
		select {
		case <-ctx.Done():
			break loop
		case <-time.After(e.pollInterval):
			t, err := e.queue.Poll(ctx)
			if err != nil {
				if !errors.Is(errors.NotFound, err) {
					e.logger.Error("error getting task from queue", zap.Error(err))
				}
				continue
			}
			tasks <- t // send task to the workers for execution
		}
	}

	wg.Wait() // wait all workers to stop

	return ctx.Err()
}