diff --git a/README.md b/README.md index ce1487df73ee27d555989234d765c04b85911c58..bb9489e75347321409dd98e959026b3183dd7df1 100644 --- a/README.md +++ b/README.md @@ -38,8 +38,14 @@ service, or by querying the task HTTP interface for task results. * [Queue](docs/queue.md) +### Cache events +Task service is able to subscribe for events produced by the Cache service +and create a Task for every received event. Current implementation uses +[NATS](https://nats.io/) for messaging system. +##### More information +* [Cache Event Task](docs/cache-event-task.md) ### Tests and Linters diff --git a/cmd/task/main.go b/cmd/task/main.go index 9a2c3d90fec79faf6f355f792d5630e4b19da0f1..554d289b521c348d30a93f6e07428ea0ec2f0ae3 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -94,7 +94,7 @@ func main() { var events *event.Client if cfg.Nats.Addr != "" { - events, err = event.New(cfg.Nats.Addr, cfg.Nats.Subject) + events, err = event.New(storage, storage, cfg.Nats.Addr, cfg.Nats.Subject) if err != nil { logger.Fatal("failed to create events client", zap.Error(err)) } diff --git a/docs/cache-event-task.md b/docs/cache-event-task.md new file mode 100644 index 0000000000000000000000000000000000000000..7c6ad78e89050c612fae47ffd7c8a23522674a77 --- /dev/null +++ b/docs/cache-event-task.md @@ -0,0 +1,41 @@ +# Task Service - Cache Event Task Documentation + +### Subscribe for Cache events + +Current implementation uses NATS as a messaging system for events. +There are two environment variables that need to be set for subscribing for cache events. + +```shell +NATS_ADDR="example.com:4222" +NATS_SUBJECT="subject" +``` + +### Event Task definition + +In order to create a Task upon receiving a Cache event an `event task template` **must** +be available. Event task JSON templates are stored in Mongo database in a collection +named `eventTask`. Below is an example of event task template definition: + +```json +{ + "key": "did:web:did.actor:alice", + "namespace": "Login", + "scope": "Administration", + "taskName": "exampleTask" +} +``` + +The `taskName` field **must** be a valid `task definition` name. See: [Tasks](task.md) + +### Create Task for Cache event + +Every Cache event contains the `key`, `namespace`, and `scope` for a created/updated entry in cache. +The task service gets an `event task template` from storage, if available, and adds a Task in task queue +passing the metadata from the event. The added Task **must** execute a policy (rather than call an external URL). +The event metadata can be accessed inside the executed policy by key. +Example: +``` +key := input.key +namespace := input.namespace +scope := input.scope +``` diff --git a/internal/clients/event/client.go b/internal/clients/event/client.go index 6058795340a32cae7408b21a0e30c43a57ae628a..8c1b7669b1ffcb88448315b3997e1a12cce4a3ab 100644 --- a/internal/clients/event/client.go +++ b/internal/clients/event/client.go @@ -3,18 +3,27 @@ package event import ( "context" "encoding/json" - "fmt" + "strings" + "time" "github.com/cloudevents/sdk-go/protocol/nats/v2" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" + + "gitlab.eclipse.org/eclipse/xfsc/tsa/golib/errors" + taskpkg "gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/service/task" ) +const eventDataKey = "key" + type Client struct { + storage taskpkg.Storage + queue taskpkg.Queue consumer *nats.Consumer events cloudevents.Client } -func New(addr, subject string) (*Client, error) { +func New(s taskpkg.Storage, q taskpkg.Queue, addr, subject string) (*Client, error) { // create cloudevents NATS consumer // other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol c, err := nats.NewConsumer(addr, subject, nats.NatsOptions()) @@ -28,29 +37,92 @@ func New(addr, subject string) (*Client, error) { } return &Client{ + storage: s, + queue: q, consumer: c, events: e, }, nil } func (c *Client) Start(ctx context.Context) error { - return c.events.StartReceiver(ctx, handler) + return c.events.StartReceiver(ctx, c.handler) } func (c *Client) Close(ctx context.Context) error { return c.consumer.Close(ctx) } -// handler is an example implementation. -// Implementation will be done in https://gitlab.eclipse.org/eclipse/xfsc/tsa/task/-/issues/7 -func handler(_ context.Context, event cloudevents.Event) error { - fmt.Printf("Got Event Context: %+v\n", event.Context) +// handler is registered as a callback function when the client is started. +// It creates a task for execution when an event is received from the cache. +func (c *Client) handler(ctx context.Context, event cloudevents.Event) error { + if event.DataContentType() != "application/json" { + return errors.New("event data has invalid content type, must be application/json") + } var data map[string]interface{} if err := json.Unmarshal(event.Data(), &data); err != nil { - fmt.Printf("Got Data Error: %s\n", err.Error()) + return err + } + + cKey, ok := data[eventDataKey] + if !ok { + return errors.New("invalid event data key") + } + cacheKey, _ := cKey.(string) + + sCacheKey := strings.Split(cacheKey, ",") + if len(sCacheKey) == 0 { + return errors.New("cache key cannot be empty") + } + + key := sCacheKey[0] + + var namespace, scope string + if len(sCacheKey) > 1 { + namespace = sCacheKey[1] + } + if len(sCacheKey) > 2 { + scope = sCacheKey[2] + } + + // get event task template from storage + eventTask, err := c.storage.EventTask(ctx, key, namespace, scope) + if err != nil { + return err + } + + // add task to task queue + if err := c.enqueueTask(ctx, eventTask); err != nil { + return err + } + + return nil +} + +func (c *Client) enqueueTask(ctx context.Context, eventTask *taskpkg.EventTask) error { + // get predefined task definition from storage + task, err := c.storage.TaskTemplate(ctx, eventTask.TaskName) + if err != nil { + return err + } + + if task.RequestPolicy == "" { + return errors.New("event task must execute a policy") + } + + input, err := json.Marshal(eventTask) + if err != nil { + return errors.New("error marshaling input to JSON", err) + } + + task.ID = uuid.NewString() + task.State = taskpkg.Created + task.CreatedAt = time.Now() + task.Request = input + + if err := c.queue.Add(ctx, task); err != nil { + return errors.New("failed to create task", err) } - fmt.Printf("Got Data: %+v\n", data) return nil } diff --git a/internal/service/task/service.go b/internal/service/task/service.go index 92be9ac1fde2b9badc82ad1a2c761035d49f36d0..329c91934f1b2d0f44d46d1f8081cade0e409732 100644 --- a/internal/service/task/service.go +++ b/internal/service/task/service.go @@ -22,6 +22,7 @@ type Storage interface { TaskTemplate(ctx context.Context, taskName string) (*Task, error) Task(ctx context.Context, taskID string) (*Task, error) TaskHistory(ctx context.Context, taskID string) (*Task, error) + EventTask(ctx context.Context, key, namespace, scope string) (*EventTask, error) } // Queue interface for retrieving, returning and removing tasks from Queue. diff --git a/internal/service/task/task.go b/internal/service/task/task.go index 8847b7d41dad525a5af9bc122ede658d44dc5569..ce52a2b85cf61b8973186a60c4a02ed2fe87c94d 100644 --- a/internal/service/task/task.go +++ b/internal/service/task/task.go @@ -45,6 +45,13 @@ type Task struct { FinishedAt time.Time `json:"finishedAt"` // FinishedAt specifies the time when the task is done. } +type EventTask struct { + Key string `json:"key"` + Namespace string `json:"namespace"` + Scope string `json:"scope"` + TaskName string +} + // CacheKey constructs the key for storing task result in the cache. func (t *Task) CacheKey() string { key := t.ID diff --git a/internal/service/task/taskfakes/fake_storage.go b/internal/service/task/taskfakes/fake_storage.go index db9918de0f7c0449193db1b43606a9b00d7fa809..3d2dd83b9c6849760a534341b5c7a3dd73748aa1 100644 --- a/internal/service/task/taskfakes/fake_storage.go +++ b/internal/service/task/taskfakes/fake_storage.go @@ -9,6 +9,22 @@ import ( ) type FakeStorage struct { + EventTaskStub func(context.Context, string, string, string) (*task.EventTask, error) + eventTaskMutex sync.RWMutex + eventTaskArgsForCall []struct { + arg1 context.Context + arg2 string + arg3 string + arg4 string + } + eventTaskReturns struct { + result1 *task.EventTask + result2 error + } + eventTaskReturnsOnCall map[int]struct { + result1 *task.EventTask + result2 error + } TaskStub func(context.Context, string) (*task.Task, error) taskMutex sync.RWMutex taskArgsForCall []struct { @@ -55,6 +71,73 @@ type FakeStorage struct { invocationsMutex sync.RWMutex } +func (fake *FakeStorage) EventTask(arg1 context.Context, arg2 string, arg3 string, arg4 string) (*task.EventTask, error) { + fake.eventTaskMutex.Lock() + ret, specificReturn := fake.eventTaskReturnsOnCall[len(fake.eventTaskArgsForCall)] + fake.eventTaskArgsForCall = append(fake.eventTaskArgsForCall, struct { + arg1 context.Context + arg2 string + arg3 string + arg4 string + }{arg1, arg2, arg3, arg4}) + stub := fake.EventTaskStub + fakeReturns := fake.eventTaskReturns + fake.recordInvocation("EventTask", []interface{}{arg1, arg2, arg3, arg4}) + fake.eventTaskMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeStorage) EventTaskCallCount() int { + fake.eventTaskMutex.RLock() + defer fake.eventTaskMutex.RUnlock() + return len(fake.eventTaskArgsForCall) +} + +func (fake *FakeStorage) EventTaskCalls(stub func(context.Context, string, string, string) (*task.EventTask, error)) { + fake.eventTaskMutex.Lock() + defer fake.eventTaskMutex.Unlock() + fake.EventTaskStub = stub +} + +func (fake *FakeStorage) EventTaskArgsForCall(i int) (context.Context, string, string, string) { + fake.eventTaskMutex.RLock() + defer fake.eventTaskMutex.RUnlock() + argsForCall := fake.eventTaskArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + +func (fake *FakeStorage) EventTaskReturns(result1 *task.EventTask, result2 error) { + fake.eventTaskMutex.Lock() + defer fake.eventTaskMutex.Unlock() + fake.EventTaskStub = nil + fake.eventTaskReturns = struct { + result1 *task.EventTask + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) EventTaskReturnsOnCall(i int, result1 *task.EventTask, result2 error) { + fake.eventTaskMutex.Lock() + defer fake.eventTaskMutex.Unlock() + fake.EventTaskStub = nil + if fake.eventTaskReturnsOnCall == nil { + fake.eventTaskReturnsOnCall = make(map[int]struct { + result1 *task.EventTask + result2 error + }) + } + fake.eventTaskReturnsOnCall[i] = struct { + result1 *task.EventTask + result2 error + }{result1, result2} +} + func (fake *FakeStorage) Task(arg1 context.Context, arg2 string) (*task.Task, error) { fake.taskMutex.Lock() ret, specificReturn := fake.taskReturnsOnCall[len(fake.taskArgsForCall)] @@ -253,6 +336,8 @@ func (fake *FakeStorage) TaskTemplateReturnsOnCall(i int, result1 *task.Task, re func (fake *FakeStorage) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.eventTaskMutex.RLock() + defer fake.eventTaskMutex.RUnlock() fake.taskMutex.RLock() defer fake.taskMutex.RUnlock() fake.taskHistoryMutex.RLock() diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 97e20c37cc520c2b91c45a557f5c17198e242ced..533b8e6aeb86ff37eeb734734f165581a319d039 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -22,9 +22,11 @@ const ( taskListQueue = "taskLists" taskListTemplates = "taskListTemplates" taskListHistory = "taskListHistory" + eventTasks = "eventTasks" ) type Storage struct { + eventTasks *mongo.Collection taskTemplates *mongo.Collection tasks *mongo.Collection tasksHistory *mongo.Collection @@ -35,6 +37,7 @@ type Storage struct { func New(db *mongo.Client) *Storage { return &Storage{ + eventTasks: db.Database(taskDB).Collection(eventTasks), taskTemplates: db.Database(taskDB).Collection(taskTemplates), tasks: db.Database(taskDB).Collection(taskQueue), tasksHistory: db.Database(taskDB).Collection(tasksHistory), @@ -357,3 +360,25 @@ func (s *Storage) TaskListHistory(ctx context.Context, taskListID string) (*task return &list, nil } + +func (s *Storage) EventTask(ctx context.Context, key, namespace, scope string) (*task.EventTask, error) { + result := s.eventTasks.FindOne(ctx, bson.M{ + "key": key, + "namespace": namespace, + "scope": scope, + }) + + if result.Err() != nil { + if strings.Contains(result.Err().Error(), "no documents in result") { + return nil, errors.New(errors.NotFound, "eventTask not found") + } + return nil, result.Err() + } + + var eventTask task.EventTask + if err := result.Decode(&eventTask); err != nil { + return nil, err + } + + return &eventTask, nil +}