Skip to content
Snippets Groups Projects
Commit b751a473 authored by Lyuben Penkovski's avatar Lyuben Penkovski
Browse files

WIP: experiments

parent 418607d6
No related branches found
No related tags found
1 merge request!5Task execution experiments
Pipeline #50487 failed
package executor //package executor
//
import ( //import (
"bytes" // "bytes"
"context" // "context"
"io" // "io"
"net/http" // "net/http"
"sync" // "sync"
"time" // "time"
//
"go.uber.org/zap" // "go.uber.org/zap"
//
"code.vereign.com/gaiax/tsa/golib/errors" // "code.vereign.com/gaiax/tsa/golib/errors"
"code.vereign.com/gaiax/tsa/task/internal/service/task" // "code.vereign.com/gaiax/tsa/task/internal/service/task"
) //)
//
const workers = 5 //const workers = 5
//
type Queue interface { //type Queue interface {
Poll(ctx context.Context) (*task.Task, error) // Poll(ctx context.Context) (*task.Task, error)
Ack(ctx context.Context, task *task.Task) error // Ack(ctx context.Context, task *task.Task) error
Unack(ctx context.Context, task *task.Task) error // Unack(ctx context.Context, task *task.Task) error
} //}
//
type Executor struct { //type Executor struct {
queue Queue // queue Queue
workers int // workers int
pollInterval time.Duration // pollInterval time.Duration
//
httpClient *http.Client // httpClient *http.Client
logger *zap.Logger // logger *zap.Logger
} //}
//
func New() *Executor { //func New() *Executor {
return &Executor{ // return &Executor{
httpClient: http.DefaultClient, // httpClient: http.DefaultClient,
} // }
} //}
//
func (e *Executor) Start(ctx context.Context) { //func (e *Executor) Start(ctx context.Context) {
if e.workers == 0 { // if e.workers == 0 {
e.workers = workers // e.workers = workers
} // }
//
var wg sync.WaitGroup // var wg sync.WaitGroup
for i := 0; i < e.workers; i++ { // for i := 0; i < e.workers; i++ {
wg.Add(1) // wg.Add(1)
go func() { // go func() {
defer wg.Done() // defer wg.Done()
e.Worker(ctx) // e.Worker(ctx)
}() // }()
} // }
//
<-ctx.Done() // <-ctx.Done()
wg.Wait() // wg.Wait()
//
e.logger.Info("task executor stopped") // e.logger.Info("task executor stopped")
} //}
//
func (e *Executor) Worker(ctx context.Context) { //func (e *Executor) Worker(ctx context.Context) {
for { // for {
select { // select {
case <-ctx.Done(): // case <-ctx.Done():
e.logger.Debug("task execution worker stopped") // e.logger.Debug("task execution worker stopped")
return // return
case <-time.After(e.pollInterval): // case <-time.After(e.pollInterval):
task, err := e.queue.Poll(ctx) // task, err := e.queue.Poll(ctx)
if err != nil { // if err != nil {
if !errors.Is(errors.NotFound, err) { // if !errors.Is(errors.NotFound, err) {
e.logger.Error("error getting task from queue", zap.Error(err)) // e.logger.Error("error getting task from queue", zap.Error(err))
} // }
continue // continue
} // }
//
logger := e.logger.With( // logger := e.logger.With(
zap.String("taskID", task.ID), // zap.String("taskID", task.ID),
zap.String("taskName", task.Name), // zap.String("taskName", task.Name),
) // )
//
executedTask, err := e.Execute(ctx, task) // executedTask, err := e.Execute(ctx, task)
if err != nil { // if err != nil {
logger.Error("error executing task", zap.Error(err)) // logger.Error("error executing task", zap.Error(err))
if err := e.queue.Unack(ctx, task); err != nil { // if err := e.queue.Unack(ctx, task); err != nil {
logger.Error("failed to unack task in queue", zap.Error(err)) // logger.Error("failed to unack task in queue", zap.Error(err))
} // }
continue // continue
} // }
//
if len(executedTask.CacheKey) > 0 { // if len(executedTask.CacheKey) > 0 {
if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil { // if err := e.cacheTaskResult(ctx, executedTask.ID, executedTask.Response); err != nil {
// TODO(penkovski): should task execution repeat if result cannot be cached? // // TODO(penkovski): should task execution repeat if result cannot be cached?
logger.Error("failed to write task result in cache", zap.Error(err)) // logger.Error("failed to write task result in cache", zap.Error(err))
} // }
} // }
//
if err := e.saveTaskHistory(ctx, executedTask); err != nil { // if err := e.saveTaskHistory(ctx, executedTask); err != nil {
logger.Error("failed to save task in history", zap.Error(err)) // logger.Error("failed to save task in history", zap.Error(err))
} // }
} // }
} // }
} //}
//
func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) { //func (e *Executor) Execute(ctx context.Context, t *task.Task) (*task.Task, error) {
// TODO: evaluate request policy // // TODO: evaluate request policy
t.StartedAt = time.Now() // t.StartedAt = time.Now()
status, response, err := e.execute(ctx, t) // status, response, err := e.execute(ctx, t)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
//
t.State = task.Done // t.State = task.Done
t.ResponseCode = status // t.ResponseCode = status
t.Response = response // t.Response = response
//
if t.ResponsePolicy != "" { // if t.ResponsePolicy != "" {
// TODO: evaluate response policy // // TODO: evaluate response policy
} // }
//
if t.FinalPolicy != "" { // if t.FinalPolicy != "" {
// TODO: evaluate finalizer policy // // TODO: evaluate finalizer policy
} // }
//
t.FinishedAt = time.Now() // t.FinishedAt = time.Now()
return t, nil // return t, nil
} //}
//
func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) { //func (e *Executor) execute(ctx context.Context, t *task.Task) (status int, response []byte, err error) {
req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request)) // req, err := http.NewRequest(t.Method, t.URL, bytes.NewReader(t.Request))
if err != nil { // if err != nil {
return 0, nil, errors.New("error creating http request", err) // return 0, nil, errors.New("error creating http request", err)
} // }
//
resp, err := e.httpClient.Do(req) // resp, err := e.httpClient.Do(req)
if err != nil { // if err != nil {
return 0, nil, errors.New("error executing http request", err) // return 0, nil, errors.New("error executing http request", err)
} // }
defer resp.Body.Close() // defer resp.Body.Close()
//
response, err = io.ReadAll(resp.Body) // response, err = io.ReadAll(resp.Body)
if err != nil { // if err != nil {
return 0, nil, errors.New("error reading response body", err) // return 0, nil, errors.New("error reading response body", err)
} // }
//
return resp.StatusCode, response, nil // return resp.StatusCode, response, nil
} //}
//
func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error { //func (e *Executor) cacheTaskResult(ctx context.Context, taskID string, taskResult []byte) error {
return errors.New("not implemented") // return errors.New("not implemented")
} //}
//
func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error { //func (e *Executor) saveTaskHistory(ctx context.Context, task *task.Task) error {
return errors.New("not implemented") // return errors.New("not implemented")
} //}
package executor
import (
"context"
"net/http"
"sync"
"time"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/golib/errors"
"code.vereign.com/gaiax/tsa/task/internal/service/task"
)
const workers = 5
type Queue interface {
Poll(ctx context.Context) (*task.Task, error)
Ack(ctx context.Context, task *task.Task) error
Unack(ctx context.Context, task *task.Task) error
}
type Executor struct {
queue Queue
workers int
pollInterval time.Duration
httpClient *http.Client
logger *zap.Logger
}
func New() *Executor {
return &Executor{}
}
func (e *Executor) Start(ctx context.Context) error {
defer e.logger.Info("task executor stopped")
if e.workers == 0 {
e.workers = workers
}
var wg sync.WaitGroup
tasks := make(chan *task.Task)
for i := 0; i < e.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
worker := newWorker(tasks, e.queue, e.httpClient, e.logger)
worker.Start(ctx)
}()
}
loop:
for {
select {
case <-ctx.Done():
break loop
case <-time.After(e.pollInterval):
t, err := e.queue.Poll(ctx)
if err != nil {
if !errors.Is(errors.NotFound, err) {
e.logger.Error("error getting task from queue", zap.Error(err))
}
continue
}
tasks <- t // send task to the workers for execution
}
}
wg.Wait() // wait all workers to stop
return ctx.Err()
}
package executor
import (
"context"
"fmt"
"net/http"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/task/internal/service/task"
)
type Worker struct {
tasks chan *task.Task
queue Queue
httpClient *http.Client
logger *zap.Logger
}
func newWorker(tasks chan *task.Task, queue Queue, httpClient *http.Client, logger *zap.Logger) *Worker {
return &Worker{
tasks: tasks,
queue: queue,
httpClient: httpClient,
logger: logger,
}
}
func (w *Worker) Start(ctx context.Context) {
defer w.logger.Debug("task worker stopped")
for {
select {
case <-ctx.Done():
return
case t := <-w.tasks:
logger := w.logger.With(
zap.String("taskID", t.ID),
zap.String("taskName", t.Name),
)
executed, err := w.execute(ctx, t)
if err != nil {
logger.Error("error executing task", zap.Error(err))
if err := w.queue.Unack(ctx, t); err != nil {
logger.Error("failed to unack task in queue", zap.Error(err))
}
continue
}
if err := w.cacheResult(ctx, executed); err != nil {
// TODO: should we remove the task from queue or it should be retried?
logger.Error("error caching task result", zap.Error(err))
continue
}
if err := w.saveTaskHistory(ctx, executed); err != nil {
logger.Error("error saving task history", zap.Error(err))
continue
}
}
}
}
func (w *Worker) execute(ctx context.Context, task *task.Task) (*task.Task, error) {
return nil, fmt.Errorf("not implemented")
}
func (w *Worker) cacheResult(ctx context.Context, task *task.Task) error {
return fmt.Errorf("not implemented")
}
func (w *Worker) saveTaskHistory(ctx context.Context, task *task.Task) error {
return fmt.Errorf("not implemented")
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment