From f46a6add991a8d7fefd03f69146c18066d9ac118 Mon Sep 17 00:00:00 2001
From: Yordan Kinkov <yordan.kinkov@vereign.com>
Date: Thu, 16 Nov 2023 15:28:55 +0200
Subject: [PATCH] Subscribe for NATS cache events

---
 cmd/task/main.go                 | 17 +++++++++
 internal/clients/event/client.go | 60 ++++++++++++++++++++++++++++++++
 internal/config/config.go        |  6 ++++
 3 files changed, 83 insertions(+)
 create mode 100644 internal/clients/event/client.go

diff --git a/cmd/task/main.go b/cmd/task/main.go
index 23eaec9..9a2c3d9 100644
--- a/cmd/task/main.go
+++ b/cmd/task/main.go
@@ -32,6 +32,7 @@ import (
 	goatask "gitlab.eclipse.org/eclipse/xfsc/tsa/task/gen/task"
 	goatasklist "gitlab.eclipse.org/eclipse/xfsc/tsa/task/gen/task_list"
 	"gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/clients/cache"
+	"gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/clients/event"
 	"gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/clients/policy"
 	"gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/config"
 	"gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/executor"
@@ -91,6 +92,17 @@ func main() {
 	// create cache client
 	cache := cache.New(cfg.Cache.Addr, cache.WithHTTPClient(oauthClient))
 
+	var events *event.Client
+	if cfg.Nats.Addr != "" {
+		events, err = event.New(cfg.Nats.Addr, cfg.Nats.Subject)
+		if err != nil {
+			logger.Fatal("failed to create events client", zap.Error(err))
+		}
+		defer events.Close(context.Background()) //nolint:errcheck
+	} else {
+		logger.Info("task service is not able to subscribe for cache events")
+	}
+
 	// create task executor
 	executor := executor.New(
 		storage,
@@ -213,6 +225,11 @@ func main() {
 	g.Go(func() error {
 		return listExecutor.Start(ctx)
 	})
+	if events != nil {
+		g.Go(func() error {
+			return events.Start(ctx)
+		})
+	}
 	if err := g.Wait(); err != nil {
 		logger.Error("run group stopped", zap.Error(err))
 	}
diff --git a/internal/clients/event/client.go b/internal/clients/event/client.go
new file mode 100644
index 0000000..9c5653d
--- /dev/null
+++ b/internal/clients/event/client.go
@@ -0,0 +1,60 @@
+package event
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+
+	"github.com/cloudevents/sdk-go/protocol/nats/v2"
+	cloudevents "github.com/cloudevents/sdk-go/v2"
+)
+
+type Client struct {
+	consumer *nats.Consumer
+	events   cloudevents.Client
+}
+
+func New(addr, subject string) (*Client, error) {
+	// create cloudevents NATS consumer
+	// other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol
+	c, err := nats.NewConsumer(addr, subject, nats.NatsOptions())
+	if err != nil {
+		return nil, err
+	}
+
+	e, err := cloudevents.NewClient(c)
+	if err != nil {
+		return nil, err
+	}
+
+	return &Client{
+		consumer: c,
+		events:   e,
+	}, nil
+}
+
+func (c *Client) Start(ctx context.Context) error {
+	for {
+		if err := c.events.StartReceiver(ctx, handler); err != nil {
+			return err
+		}
+	}
+}
+
+func (c *Client) Close(ctx context.Context) error {
+	return c.consumer.Close(ctx)
+}
+
+// handler is an example implementation.
+// Implementation will be done in https://gitlab.eclipse.org/eclipse/xfsc/tsa/task/-/issues/7
+func handler(_ context.Context, event cloudevents.Event) error {
+	fmt.Printf("Got Event Context: %+v\n", event.Context)
+
+	var data map[string]interface{}
+	if err := json.Unmarshal(event.Data(), &data); err != nil {
+		fmt.Printf("Got Data Error: %s\n", err.Error())
+	}
+	fmt.Printf("Got Data: %+v\n", data)
+
+	return nil
+}
diff --git a/internal/config/config.go b/internal/config/config.go
index baf99e5..dc18734 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -12,6 +12,7 @@ type Config struct {
 	Cache        cacheConfig
 	Metrics      metricsConfig
 	OAuth        oauthConfig
+	Nats         natsConfig
 
 	LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"`
 }
@@ -64,3 +65,8 @@ type oauthConfig struct {
 	ClientSecret string `envconfig:"OAUTH_CLIENT_SECRET"`
 	TokenURL     string `envconfig:"OAUTH_TOKEN_URL"`
 }
+
+type natsConfig struct {
+	Addr    string `envconfig:"NATS_ADDR"`
+	Subject string `envconfig:"NATS_SUBJECT" default:"external"`
+}
-- 
GitLab