diff --git a/cmd/infohub/main.go b/cmd/infohub/main.go index 9708162237c5c8b89e628324ac20864400c10bf0..89382d19aa3fda6f937f4b8f434b416bf536392e 100644 --- a/cmd/infohub/main.go +++ b/cmd/infohub/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "log" "net/http" "time" @@ -41,6 +42,7 @@ func main() { ) tasks := tasks.New(cache, logger) + go tasks.Start(context.Background()) handler := httpapi.New(tasks, logger) httpsrv := &http.Server{ diff --git a/internal/tasks/controller.go b/internal/tasks/controller.go index c7991ee1561d7ce89391a2b78bed17a3a4d3dcb9..aa74b5cdf3f06f621d19d3b61dd57aa9b8827b48 100644 --- a/internal/tasks/controller.go +++ b/internal/tasks/controller.go @@ -3,6 +3,11 @@ package tasks import ( "context" "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "sync" "time" "github.com/google/uuid" @@ -15,6 +20,8 @@ import ( type Controller struct { logger *zap.Logger cache Cache + + taskChan chan Task } type Cache interface { @@ -26,8 +33,9 @@ type Cache interface { func New(cache Cache, logger *zap.Logger) *Controller { return &Controller{ - logger: logger, - cache: cache, + logger: logger, + cache: cache, + taskChan: make(chan Task), } } @@ -36,15 +44,13 @@ func (c *Controller) GetData(ctx context.Context, key string) (*GetDataResponse, // key not found in cache if err == cache.ErrNotFound { - task := createResolveTask(key) - bytes, err := json.Marshal(task) - if err != nil { - return nil, errors.New("failed to encode task", err) - } - err = c.cache.Enqueue(ctx, string(bytes)) + task := createResolveTask(key) // create task + err = c.queueTask(ctx, task) // add task in queue if err != nil { - return nil, errors.New("failed to enqueue task", err) + return nil, errors.New("adding task to queue failed", err) } + c.logger.Info(fmt.Sprintf("task with ID: %s is added to queue", task.ID)) + return &GetDataResponse{ID: task.ID}, nil } if err != nil { @@ -55,10 +61,109 @@ func (c *Controller) GetData(ctx context.Context, key string) (*GetDataResponse, return &GetDataResponse{Data: data}, nil } -func createResolveTask(key string) *Task { - return &Task{ +func (c *Controller) Start(ctx context.Context) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + c.executeWorker(ctx, c.taskChan) + }() + + for { + select { + case <-ctx.Done(): + wg.Wait() + return + case <-time.After(30 * time.Second): + var task Task + val, err := c.cache.Dequeue(ctx) + if val == "" { + continue + } + if err != nil { + c.logger.Error("failed to dequeue task", zap.Error(err)) + continue + } + err = json.Unmarshal([]byte(val), &task) + if err != nil { + c.logger.Error("failed to decode task from queue", zap.Error(err)) + continue + } + c.logger.Info(fmt.Sprintf("task with ID: %s is pulled out of the queue for execution", task.ID)) + c.taskChan <- task + } + + } + +} + +func (c *Controller) executeWorker(ctx context.Context, taskChan chan Task) { + for { + select { + case <-ctx.Done(): + c.logger.Info("task executor worker stopped", zap.Error(ctx.Err())) + return + case task := <-c.taskChan: + err := c.executeTask(ctx, task) + if err != nil { + c.logger.Error("failed to execute task", zap.Error(err)) + // TODO(kinkov): handle failed task properly + continue + } + } + } +} + +func (c *Controller) executeTask(ctx context.Context, task Task) error { + var url string + var body io.Reader + if task.Method == "GET" { + url = task.Url + task.Body + } + // TODO(kinkov): implement POST method + + req, err := http.NewRequest(task.Method, url, body) + if err != nil { + return err + } + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + bytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + err = c.cache.Set(ctx, task.ID, string(bytes), 0) + if err != nil { + return err + } + err = c.cache.Set(ctx, task.Body, string(bytes), 0) + if err != nil { + return err + } + c.logger.Info(fmt.Sprintf("task with ID: %s is executed successfully", task.ID)) + return nil +} + +func (c *Controller) queueTask(ctx context.Context, task Task) error { + bytes, err := json.Marshal(task) + if err != nil { + return errors.New("failed to encode task", err) + } + err = c.cache.Enqueue(ctx, string(bytes)) // add task in queue + if err != nil { + return errors.New("failed to enqueue task", err) + } + return nil +} + +func createResolveTask(key string) Task { + return Task{ ID: uuid.New().String(), - Url: "uni-resolver-web:8080", + Url: "http://uni-resolver-web:8080/1.0/identifiers/", Body: key, Method: "GET", }