//package executor // //import ( // "bytes" // "context" // "io" // "net/http" // "sync" // "time" // // "go.uber.org/zap" // // "code.vereign.com/gaiax/tsa/golib/errors" // "code.vereign.com/gaiax/tsa/task/internal/service/task" //) // //const workers = 5 // //type Queue interface { // Poll(ctx context.Context) (*task.Task, error) // Ack(ctx context.Context, task *task.Task) error // Unack(ctx context.Context, task *task.Task) error //} // //type Executor struct { // queue Queue // workers int // pollInterval time.Duration // // httpClient *http.Client // logger *zap.Logger //} // //func New() *Executor { // return &Executor{ // httpClient: http.DefaultClient, // } //} // //func (e *Executor) Start(ctx context.Context) { // if e.workers == 0 { // e.workers = workers // } // // var wg sync.WaitGroup // for i := 0; i < e.workers; i++ { // wg.Add(1) // go func() { // defer wg.Done() // e.Worker(ctx) // }() // } // // <-ctx.Done() // wg.Wait() // // e.logger.Info("task executor stopped") //} // //func (e *Executor) Worker(ctx context.Context) { // for { // select { // case <-ctx.Done(): // e.logger.Debug("task execution worker stopped") // return // case <-time.After(e.pollInterval): // task, err := e.queue.Poll(ctx) // if err != nil { // if !errors.Is(errors.NotFound, err) { // e.logger.Error("error getting task from queue", zap.Error(err)) // } // continue // } // // logger := e.logger.With( // zap.String("taskID", task.ID), // zap.String("taskName", task.Name), // ) // // executedTask, err := e.Execute(ctx, task) // if err != nil { // logger.Error("error executing task", zap.Error(err)) // if err := e.queue.Unack(ctx, task); err != nil { // logger.Error("failed to unack task in queue", zap.Error(err)) // } // continue // } // // if len(executedTask.CacheKey) > 0 { // if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil { // // TODO(penkovski): should task execution repeat if result cannot be cached? // logger.Error("failed to write task result in cache", zap.Error(err)) // } // } // // if err := e.saveTaskHistory(ctx, executedTask); err != nil { // logger.Error("failed to save task in history", zap.Error(err)) // } // } // } //} // //func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) { // // TODO: evaluate request policy // t.StartedAt = time.Now() // status, response, err := e.execute(ctx, t) // if err != nil { // return nil, err // } // // t.State = task.Done // t.ResponseCode = status // t.Response = response // // if t.ResponsePolicy != "" { // // TODO: evaluate response policy // } // // if t.FinalPolicy != "" { // // TODO: evaluate finalizer policy // } // // t.FinishedAt = time.Now() // return t, nil //} // //func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) { // req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request)) // if err != nil { // return 0, nil, errors.New("error creating http request", err) // } // // resp, err := e.httpClient.Do(req) // if err != nil { // return 0, nil, errors.New("error executing http request", err) // } // defer resp.Body.Close() // // response, err = io.ReadAll(resp.Body) // if err != nil { // return 0, nil, errors.New("error reading response body", err) // } // // return resp.StatusCode, response, nil //} // //func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error { // return errors.New("not implemented") //} // //func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error { // return errors.New("not implemented") //}