From daeacb843320f5dfc1b666cffe7e2d5348fa53cf Mon Sep 17 00:00:00 2001
From: Yordan Kinkov <yordan.kinkov@vereign.com>
Date: Thu, 24 Feb 2022 15:11:37 +0200
Subject: [PATCH] #1 implement task execution

---
 cmd/infohub/main.go          |   2 +
 internal/tasks/controller.go | 129 +++++++++++++++++++++++++++++++----
 2 files changed, 119 insertions(+), 12 deletions(-)

diff --git a/cmd/infohub/main.go b/cmd/infohub/main.go
index 9708162..89382d1 100644
--- a/cmd/infohub/main.go
+++ b/cmd/infohub/main.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"log"
 	"net/http"
 	"time"
@@ -41,6 +42,7 @@ func main() {
 	)
 
 	tasks := tasks.New(cache, logger)
+	go tasks.Start(context.Background())
 
 	handler := httpapi.New(tasks, logger)
 	httpsrv := &http.Server{
diff --git a/internal/tasks/controller.go b/internal/tasks/controller.go
index c7991ee..aa74b5c 100644
--- a/internal/tasks/controller.go
+++ b/internal/tasks/controller.go
@@ -3,6 +3,11 @@ package tasks
 import (
 	"context"
 	"encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"sync"
 	"time"
 
 	"github.com/google/uuid"
@@ -15,6 +20,8 @@ import (
 type Controller struct {
 	logger *zap.Logger
 	cache  Cache
+
+	taskChan chan Task
 }
 
 type Cache interface {
@@ -26,8 +33,9 @@ type Cache interface {
 
 func New(cache Cache, logger *zap.Logger) *Controller {
 	return &Controller{
-		logger: logger,
-		cache:  cache,
+		logger:   logger,
+		cache:    cache,
+		taskChan: make(chan Task),
 	}
 }
 
@@ -36,15 +44,13 @@ func (c *Controller) GetData(ctx context.Context, key string) (*GetDataResponse,
 
 	// key not found in cache
 	if err == cache.ErrNotFound {
-		task := createResolveTask(key)
-		bytes, err := json.Marshal(task)
-		if err != nil {
-			return nil, errors.New("failed to encode task", err)
-		}
-		err = c.cache.Enqueue(ctx, string(bytes))
+		task := createResolveTask(key) // create task
+		err = c.queueTask(ctx, task)   // add task in queue
 		if err != nil {
-			return nil, errors.New("failed to enqueue task", err)
+			return nil, errors.New("adding task to queue failed", err)
 		}
+		c.logger.Info(fmt.Sprintf("task with ID: %s is added to queue", task.ID))
+
 		return &GetDataResponse{ID: task.ID}, nil
 	}
 	if err != nil {
@@ -55,10 +61,109 @@ func (c *Controller) GetData(ctx context.Context, key string) (*GetDataResponse,
 	return &GetDataResponse{Data: data}, nil
 }
 
-func createResolveTask(key string) *Task {
-	return &Task{
+func (c *Controller) Start(ctx context.Context) {
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		c.executeWorker(ctx, c.taskChan)
+	}()
+
+	for {
+		select {
+		case <-ctx.Done():
+			wg.Wait()
+			return
+		case <-time.After(30 * time.Second):
+			var task Task
+			val, err := c.cache.Dequeue(ctx)
+			if val == "" {
+				continue
+			}
+			if err != nil {
+				c.logger.Error("failed to dequeue task", zap.Error(err))
+				continue
+			}
+			err = json.Unmarshal([]byte(val), &task)
+			if err != nil {
+				c.logger.Error("failed to decode task from queue", zap.Error(err))
+				continue
+			}
+			c.logger.Info(fmt.Sprintf("task with ID: %s is pulled out of the queue for execution", task.ID))
+			c.taskChan <- task
+		}
+
+	}
+
+}
+
+func (c *Controller) executeWorker(ctx context.Context, taskChan chan Task) {
+	for {
+		select {
+		case <-ctx.Done():
+			c.logger.Info("task executor worker stopped", zap.Error(ctx.Err()))
+			return
+		case task := <-c.taskChan:
+			err := c.executeTask(ctx, task)
+			if err != nil {
+				c.logger.Error("failed to execute task", zap.Error(err))
+				// TODO(kinkov): handle failed task properly
+				continue
+			}
+		}
+	}
+}
+
+func (c *Controller) executeTask(ctx context.Context, task Task) error {
+	var url string
+	var body io.Reader
+	if task.Method == "GET" {
+		url = task.Url + task.Body
+	}
+	// TODO(kinkov): implement POST method
+
+	req, err := http.NewRequest(task.Method, url, body)
+	if err != nil {
+		return err
+	}
+	client := &http.Client{}
+	resp, err := client.Do(req)
+	if err != nil {
+		return err
+	}
+	bytes, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return err
+	}
+
+	err = c.cache.Set(ctx, task.ID, string(bytes), 0)
+	if err != nil {
+		return err
+	}
+	err = c.cache.Set(ctx, task.Body, string(bytes), 0)
+	if err != nil {
+		return err
+	}
+	c.logger.Info(fmt.Sprintf("task with ID: %s is executed successfully", task.ID))
+	return nil
+}
+
+func (c *Controller) queueTask(ctx context.Context, task Task) error {
+	bytes, err := json.Marshal(task)
+	if err != nil {
+		return errors.New("failed to encode task", err)
+	}
+	err = c.cache.Enqueue(ctx, string(bytes)) // add task in queue
+	if err != nil {
+		return errors.New("failed to enqueue task", err)
+	}
+	return nil
+}
+
+func createResolveTask(key string) Task {
+	return Task{
 		ID:     uuid.New().String(),
-		Url:    "uni-resolver-web:8080",
+		Url:    "http://uni-resolver-web:8080/1.0/identifiers/",
 		Body:   key,
 		Method: "GET",
 	}
-- 
GitLab