Skip to content
Snippets Groups Projects
Commit daeacb84 authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

#1 implement task execution

parent 3b371a45
No related tags found
No related merge requests found
package main package main
import ( import (
"context"
"log" "log"
"net/http" "net/http"
"time" "time"
...@@ -41,6 +42,7 @@ func main() { ...@@ -41,6 +42,7 @@ func main() {
) )
tasks := tasks.New(cache, logger) tasks := tasks.New(cache, logger)
go tasks.Start(context.Background())
handler := httpapi.New(tasks, logger) handler := httpapi.New(tasks, logger)
httpsrv := &http.Server{ httpsrv := &http.Server{
......
...@@ -3,6 +3,11 @@ package tasks ...@@ -3,6 +3,11 @@ package tasks
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
...@@ -15,6 +20,8 @@ import ( ...@@ -15,6 +20,8 @@ import (
type Controller struct { type Controller struct {
logger *zap.Logger logger *zap.Logger
cache Cache cache Cache
taskChan chan Task
} }
type Cache interface { type Cache interface {
...@@ -26,8 +33,9 @@ type Cache interface { ...@@ -26,8 +33,9 @@ type Cache interface {
func New(cache Cache, logger *zap.Logger) *Controller { func New(cache Cache, logger *zap.Logger) *Controller {
return &Controller{ return &Controller{
logger: logger, logger: logger,
cache: cache, cache: cache,
taskChan: make(chan Task),
} }
} }
...@@ -36,15 +44,13 @@ func (c *Controller) GetData(ctx context.Context, key string) (*GetDataResponse, ...@@ -36,15 +44,13 @@ func (c *Controller) GetData(ctx context.Context, key string) (*GetDataResponse,
// key not found in cache // key not found in cache
if err == cache.ErrNotFound { if err == cache.ErrNotFound {
task := createResolveTask(key) task := createResolveTask(key) // create task
bytes, err := json.Marshal(task) err = c.queueTask(ctx, task) // add task in queue
if err != nil {
return nil, errors.New("failed to encode task", err)
}
err = c.cache.Enqueue(ctx, string(bytes))
if err != nil { if err != nil {
return nil, errors.New("failed to enqueue task", err) return nil, errors.New("adding task to queue failed", err)
} }
c.logger.Info(fmt.Sprintf("task with ID: %s is added to queue", task.ID))
return &GetDataResponse{ID: task.ID}, nil return &GetDataResponse{ID: task.ID}, nil
} }
if err != nil { if err != nil {
...@@ -55,10 +61,109 @@ func (c *Controller) GetData(ctx context.Context, key string) (*GetDataResponse, ...@@ -55,10 +61,109 @@ func (c *Controller) GetData(ctx context.Context, key string) (*GetDataResponse,
return &GetDataResponse{Data: data}, nil return &GetDataResponse{Data: data}, nil
} }
func createResolveTask(key string) *Task { func (c *Controller) Start(ctx context.Context) {
return &Task{ var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
c.executeWorker(ctx, c.taskChan)
}()
for {
select {
case <-ctx.Done():
wg.Wait()
return
case <-time.After(30 * time.Second):
var task Task
val, err := c.cache.Dequeue(ctx)
if val == "" {
continue
}
if err != nil {
c.logger.Error("failed to dequeue task", zap.Error(err))
continue
}
err = json.Unmarshal([]byte(val), &task)
if err != nil {
c.logger.Error("failed to decode task from queue", zap.Error(err))
continue
}
c.logger.Info(fmt.Sprintf("task with ID: %s is pulled out of the queue for execution", task.ID))
c.taskChan <- task
}
}
}
func (c *Controller) executeWorker(ctx context.Context, taskChan chan Task) {
for {
select {
case <-ctx.Done():
c.logger.Info("task executor worker stopped", zap.Error(ctx.Err()))
return
case task := <-c.taskChan:
err := c.executeTask(ctx, task)
if err != nil {
c.logger.Error("failed to execute task", zap.Error(err))
// TODO(kinkov): handle failed task properly
continue
}
}
}
}
func (c *Controller) executeTask(ctx context.Context, task Task) error {
var url string
var body io.Reader
if task.Method == "GET" {
url = task.Url + task.Body
}
// TODO(kinkov): implement POST method
req, err := http.NewRequest(task.Method, url, body)
if err != nil {
return err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
bytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
err = c.cache.Set(ctx, task.ID, string(bytes), 0)
if err != nil {
return err
}
err = c.cache.Set(ctx, task.Body, string(bytes), 0)
if err != nil {
return err
}
c.logger.Info(fmt.Sprintf("task with ID: %s is executed successfully", task.ID))
return nil
}
func (c *Controller) queueTask(ctx context.Context, task Task) error {
bytes, err := json.Marshal(task)
if err != nil {
return errors.New("failed to encode task", err)
}
err = c.cache.Enqueue(ctx, string(bytes)) // add task in queue
if err != nil {
return errors.New("failed to enqueue task", err)
}
return nil
}
func createResolveTask(key string) Task {
return Task{
ID: uuid.New().String(), ID: uuid.New().String(),
Url: "uni-resolver-web:8080", Url: "http://uni-resolver-web:8080/1.0/identifiers/",
Body: key, Body: key,
Method: "GET", Method: "GET",
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment