Skip to content
Snippets Groups Projects
Commit c1bcf22e authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

Merge branch '7-execute-task-for-cache-event' into 'main'

Execute task for each event from cache

Closes #7

See merge request eclipse/xfsc/tsa/task!12
parents ebcb6aa7 f81e92f3
Branches
No related tags found
No related merge requests found
...@@ -38,8 +38,14 @@ service, or by querying the task HTTP interface for task results. ...@@ -38,8 +38,14 @@ service, or by querying the task HTTP interface for task results.
* [Queue](docs/queue.md) * [Queue](docs/queue.md)
### Cache events
Task service is able to subscribe for events produced by the Cache service
and create a Task for every received event. Current implementation uses
[NATS](https://nats.io/) for messaging system.
##### More information
* [Cache Event Task](docs/cache-event-task.md)
### Tests and Linters ### Tests and Linters
......
...@@ -94,7 +94,7 @@ func main() { ...@@ -94,7 +94,7 @@ func main() {
var events *event.Client var events *event.Client
if cfg.Nats.Addr != "" { if cfg.Nats.Addr != "" {
events, err = event.New(cfg.Nats.Addr, cfg.Nats.Subject) events, err = event.New(storage, storage, cfg.Nats.Addr, cfg.Nats.Subject)
if err != nil { if err != nil {
logger.Fatal("failed to create events client", zap.Error(err)) logger.Fatal("failed to create events client", zap.Error(err))
} }
......
# Task Service - Cache Event Task Documentation
### Subscribe for Cache events
Current implementation uses NATS as a messaging system for events.
There are two environment variables that need to be set for subscribing for cache events.
```shell
NATS_ADDR="example.com:4222"
NATS_SUBJECT="subject"
```
### Event Task definition
In order to create a Task upon receiving a Cache event an `event task template` **must**
be available. Event task JSON templates are stored in Mongo database in a collection
named `eventTask`. Below is an example of event task template definition:
```json
{
"key": "did:web:did.actor:alice",
"namespace": "Login",
"scope": "Administration",
"taskName": "exampleTask"
}
```
The `taskName` field **must** be a valid `task definition` name. See: [Tasks](task.md)
### Create Task for Cache event
Every Cache event contains the `key`, `namespace`, and `scope` for a created/updated entry in cache.
The task service gets an `event task template` from storage, if available, and adds a Task in task queue
passing the metadata from the event. The added Task **must** execute a policy (rather than call an external URL).
The event metadata can be accessed inside the executed policy by key.
Example:
```
key := input.key
namespace := input.namespace
scope := input.scope
```
...@@ -3,18 +3,27 @@ package event ...@@ -3,18 +3,27 @@ package event
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "strings"
"time"
"github.com/cloudevents/sdk-go/protocol/nats/v2" "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2" cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"gitlab.eclipse.org/eclipse/xfsc/tsa/golib/errors"
taskpkg "gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/service/task"
) )
const eventDataKey = "key"
type Client struct { type Client struct {
storage taskpkg.Storage
queue taskpkg.Queue
consumer *nats.Consumer consumer *nats.Consumer
events cloudevents.Client events cloudevents.Client
} }
func New(addr, subject string) (*Client, error) { func New(s taskpkg.Storage, q taskpkg.Queue, addr, subject string) (*Client, error) {
// create cloudevents NATS consumer // create cloudevents NATS consumer
// other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol // other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol
c, err := nats.NewConsumer(addr, subject, nats.NatsOptions()) c, err := nats.NewConsumer(addr, subject, nats.NatsOptions())
...@@ -28,29 +37,92 @@ func New(addr, subject string) (*Client, error) { ...@@ -28,29 +37,92 @@ func New(addr, subject string) (*Client, error) {
} }
return &Client{ return &Client{
storage: s,
queue: q,
consumer: c, consumer: c,
events: e, events: e,
}, nil }, nil
} }
func (c *Client) Start(ctx context.Context) error { func (c *Client) Start(ctx context.Context) error {
return c.events.StartReceiver(ctx, handler) return c.events.StartReceiver(ctx, c.handler)
} }
func (c *Client) Close(ctx context.Context) error { func (c *Client) Close(ctx context.Context) error {
return c.consumer.Close(ctx) return c.consumer.Close(ctx)
} }
// handler is an example implementation. // handler is registered as a callback function when the client is started.
// Implementation will be done in https://gitlab.eclipse.org/eclipse/xfsc/tsa/task/-/issues/7 // It creates a task for execution when an event is received from the cache.
func handler(_ context.Context, event cloudevents.Event) error { func (c *Client) handler(ctx context.Context, event cloudevents.Event) error {
fmt.Printf("Got Event Context: %+v\n", event.Context) if event.DataContentType() != "application/json" {
return errors.New("event data has invalid content type, must be application/json")
}
var data map[string]interface{} var data map[string]interface{}
if err := json.Unmarshal(event.Data(), &data); err != nil { if err := json.Unmarshal(event.Data(), &data); err != nil {
fmt.Printf("Got Data Error: %s\n", err.Error()) return err
}
cKey, ok := data[eventDataKey]
if !ok {
return errors.New("invalid event data key")
}
cacheKey, _ := cKey.(string)
sCacheKey := strings.Split(cacheKey, ",")
if len(sCacheKey) == 0 {
return errors.New("cache key cannot be empty")
}
key := sCacheKey[0]
var namespace, scope string
if len(sCacheKey) > 1 {
namespace = sCacheKey[1]
}
if len(sCacheKey) > 2 {
scope = sCacheKey[2]
}
// get event task template from storage
eventTask, err := c.storage.EventTask(ctx, key, namespace, scope)
if err != nil {
return err
}
// add task to task queue
if err := c.enqueueTask(ctx, eventTask); err != nil {
return err
}
return nil
}
func (c *Client) enqueueTask(ctx context.Context, eventTask *taskpkg.EventTask) error {
// get predefined task definition from storage
task, err := c.storage.TaskTemplate(ctx, eventTask.TaskName)
if err != nil {
return err
}
if task.RequestPolicy == "" {
return errors.New("event task must execute a policy")
}
input, err := json.Marshal(eventTask)
if err != nil {
return errors.New("error marshaling input to JSON", err)
}
task.ID = uuid.NewString()
task.State = taskpkg.Created
task.CreatedAt = time.Now()
task.Request = input
if err := c.queue.Add(ctx, task); err != nil {
return errors.New("failed to create task", err)
} }
fmt.Printf("Got Data: %+v\n", data)
return nil return nil
} }
...@@ -22,6 +22,7 @@ type Storage interface { ...@@ -22,6 +22,7 @@ type Storage interface {
TaskTemplate(ctx context.Context, taskName string) (*Task, error) TaskTemplate(ctx context.Context, taskName string) (*Task, error)
Task(ctx context.Context, taskID string) (*Task, error) Task(ctx context.Context, taskID string) (*Task, error)
TaskHistory(ctx context.Context, taskID string) (*Task, error) TaskHistory(ctx context.Context, taskID string) (*Task, error)
EventTask(ctx context.Context, key, namespace, scope string) (*EventTask, error)
} }
// Queue interface for retrieving, returning and removing tasks from Queue. // Queue interface for retrieving, returning and removing tasks from Queue.
......
...@@ -45,6 +45,13 @@ type Task struct { ...@@ -45,6 +45,13 @@ type Task struct {
FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done. FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done.
} }
type EventTask struct {
Key string `json:"key"`
Namespace string `json:"namespace"`
Scope string `json:"scope"`
TaskName string
}
// CacheKey constructs the key for storing task result in the cache. // CacheKey constructs the key for storing task result in the cache.
func (t *Task) CacheKey() string { func (t *Task) CacheKey() string {
key := t.ID key := t.ID
......
...@@ -9,6 +9,22 @@ import ( ...@@ -9,6 +9,22 @@ import (
) )
type FakeStorage struct { type FakeStorage struct {
EventTaskStub func(context.Context, string, string, string) (*task.EventTask, error)
eventTaskMutex sync.RWMutex
eventTaskArgsForCall []struct {
arg1 context.Context
arg2 string
arg3 string
arg4 string
}
eventTaskReturns struct {
result1 *task.EventTask
result2 error
}
eventTaskReturnsOnCall map[int]struct {
result1 *task.EventTask
result2 error
}
TaskStub func(context.Context, string) (*task.Task, error) TaskStub func(context.Context, string) (*task.Task, error)
taskMutex sync.RWMutex taskMutex sync.RWMutex
taskArgsForCall []struct { taskArgsForCall []struct {
...@@ -55,6 +71,73 @@ type FakeStorage struct { ...@@ -55,6 +71,73 @@ type FakeStorage struct {
invocationsMutex sync.RWMutex invocationsMutex sync.RWMutex
} }
func (fake *FakeStorage) EventTask(arg1 context.Context, arg2 string, arg3 string, arg4 string) (*task.EventTask, error) {
fake.eventTaskMutex.Lock()
ret, specificReturn := fake.eventTaskReturnsOnCall[len(fake.eventTaskArgsForCall)]
fake.eventTaskArgsForCall = append(fake.eventTaskArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 string
arg4 string
}{arg1, arg2, arg3, arg4})
stub := fake.EventTaskStub
fakeReturns := fake.eventTaskReturns
fake.recordInvocation("EventTask", []interface{}{arg1, arg2, arg3, arg4})
fake.eventTaskMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3, arg4)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeStorage) EventTaskCallCount() int {
fake.eventTaskMutex.RLock()
defer fake.eventTaskMutex.RUnlock()
return len(fake.eventTaskArgsForCall)
}
func (fake *FakeStorage) EventTaskCalls(stub func(context.Context, string, string, string) (*task.EventTask, error)) {
fake.eventTaskMutex.Lock()
defer fake.eventTaskMutex.Unlock()
fake.EventTaskStub = stub
}
func (fake *FakeStorage) EventTaskArgsForCall(i int) (context.Context, string, string, string) {
fake.eventTaskMutex.RLock()
defer fake.eventTaskMutex.RUnlock()
argsForCall := fake.eventTaskArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeStorage) EventTaskReturns(result1 *task.EventTask, result2 error) {
fake.eventTaskMutex.Lock()
defer fake.eventTaskMutex.Unlock()
fake.EventTaskStub = nil
fake.eventTaskReturns = struct {
result1 *task.EventTask
result2 error
}{result1, result2}
}
func (fake *FakeStorage) EventTaskReturnsOnCall(i int, result1 *task.EventTask, result2 error) {
fake.eventTaskMutex.Lock()
defer fake.eventTaskMutex.Unlock()
fake.EventTaskStub = nil
if fake.eventTaskReturnsOnCall == nil {
fake.eventTaskReturnsOnCall = make(map[int]struct {
result1 *task.EventTask
result2 error
})
}
fake.eventTaskReturnsOnCall[i] = struct {
result1 *task.EventTask
result2 error
}{result1, result2}
}
func (fake *FakeStorage) Task(arg1 context.Context, arg2 string) (*task.Task, error) { func (fake *FakeStorage) Task(arg1 context.Context, arg2 string) (*task.Task, error) {
fake.taskMutex.Lock() fake.taskMutex.Lock()
ret, specificReturn := fake.taskReturnsOnCall[len(fake.taskArgsForCall)] ret, specificReturn := fake.taskReturnsOnCall[len(fake.taskArgsForCall)]
...@@ -253,6 +336,8 @@ func (fake *FakeStorage) TaskTemplateReturnsOnCall(i int, result1 *task.Task, re ...@@ -253,6 +336,8 @@ func (fake *FakeStorage) TaskTemplateReturnsOnCall(i int, result1 *task.Task, re
func (fake *FakeStorage) Invocations() map[string][][]interface{} { func (fake *FakeStorage) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock() fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock() defer fake.invocationsMutex.RUnlock()
fake.eventTaskMutex.RLock()
defer fake.eventTaskMutex.RUnlock()
fake.taskMutex.RLock() fake.taskMutex.RLock()
defer fake.taskMutex.RUnlock() defer fake.taskMutex.RUnlock()
fake.taskHistoryMutex.RLock() fake.taskHistoryMutex.RLock()
......
...@@ -22,9 +22,11 @@ const ( ...@@ -22,9 +22,11 @@ const (
taskListQueue = "taskLists" taskListQueue = "taskLists"
taskListTemplates = "taskListTemplates" taskListTemplates = "taskListTemplates"
taskListHistory = "taskListHistory" taskListHistory = "taskListHistory"
eventTasks = "eventTasks"
) )
type Storage struct { type Storage struct {
eventTasks *mongo.Collection
taskTemplates *mongo.Collection taskTemplates *mongo.Collection
tasks *mongo.Collection tasks *mongo.Collection
tasksHistory *mongo.Collection tasksHistory *mongo.Collection
...@@ -35,6 +37,7 @@ type Storage struct { ...@@ -35,6 +37,7 @@ type Storage struct {
func New(db *mongo.Client) *Storage { func New(db *mongo.Client) *Storage {
return &Storage{ return &Storage{
eventTasks: db.Database(taskDB).Collection(eventTasks),
taskTemplates: db.Database(taskDB).Collection(taskTemplates), taskTemplates: db.Database(taskDB).Collection(taskTemplates),
tasks: db.Database(taskDB).Collection(taskQueue), tasks: db.Database(taskDB).Collection(taskQueue),
tasksHistory: db.Database(taskDB).Collection(tasksHistory), tasksHistory: db.Database(taskDB).Collection(tasksHistory),
...@@ -357,3 +360,25 @@ func (s *Storage) TaskListHistory(ctx context.Context, taskListID string) (*task ...@@ -357,3 +360,25 @@ func (s *Storage) TaskListHistory(ctx context.Context, taskListID string) (*task
return &list, nil return &list, nil
} }
func (s *Storage) EventTask(ctx context.Context, key, namespace, scope string) (*task.EventTask, error) {
result := s.eventTasks.FindOne(ctx, bson.M{
"key": key,
"namespace": namespace,
"scope": scope,
})
if result.Err() != nil {
if strings.Contains(result.Err().Error(), "no documents in result") {
return nil, errors.New(errors.NotFound, "eventTask not found")
}
return nil, result.Err()
}
var eventTask task.EventTask
if err := result.Decode(&eventTask); err != nil {
return nil, err
}
return &eventTask, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment