diff --git a/cmd/task/main.go b/cmd/task/main.go index 23eaec99e7c4429bdbdd5bc17c94c870c7cd962f..9a2c3d90fec79faf6f355f792d5630e4b19da0f1 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -32,6 +32,7 @@ import ( goatask "gitlab.eclipse.org/eclipse/xfsc/tsa/task/gen/task" goatasklist "gitlab.eclipse.org/eclipse/xfsc/tsa/task/gen/task_list" "gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/clients/cache" + "gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/clients/event" "gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/clients/policy" "gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/config" "gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/executor" @@ -91,6 +92,17 @@ func main() { // create cache client cache := cache.New(cfg.Cache.Addr, cache.WithHTTPClient(oauthClient)) + var events *event.Client + if cfg.Nats.Addr != "" { + events, err = event.New(cfg.Nats.Addr, cfg.Nats.Subject) + if err != nil { + logger.Fatal("failed to create events client", zap.Error(err)) + } + defer events.Close(context.Background()) //nolint:errcheck + } else { + logger.Info("task service is not able to subscribe for cache events") + } + // create task executor executor := executor.New( storage, @@ -213,6 +225,11 @@ func main() { g.Go(func() error { return listExecutor.Start(ctx) }) + if events != nil { + g.Go(func() error { + return events.Start(ctx) + }) + } if err := g.Wait(); err != nil { logger.Error("run group stopped", zap.Error(err)) } diff --git a/internal/clients/event/client.go b/internal/clients/event/client.go new file mode 100644 index 0000000000000000000000000000000000000000..9c5653d22543dc7eaa16a5a932d16a0709c57833 --- /dev/null +++ b/internal/clients/event/client.go @@ -0,0 +1,60 @@ +package event + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/cloudevents/sdk-go/protocol/nats/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +type Client struct { + consumer *nats.Consumer + events cloudevents.Client +} + +func New(addr, subject string) (*Client, error) { + // create cloudevents NATS consumer + // other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol + c, err := nats.NewConsumer(addr, subject, nats.NatsOptions()) + if err != nil { + return nil, err + } + + e, err := cloudevents.NewClient(c) + if err != nil { + return nil, err + } + + return &Client{ + consumer: c, + events: e, + }, nil +} + +func (c *Client) Start(ctx context.Context) error { + for { + if err := c.events.StartReceiver(ctx, handler); err != nil { + return err + } + } +} + +func (c *Client) Close(ctx context.Context) error { + return c.consumer.Close(ctx) +} + +// handler is an example implementation. +// Implementation will be done in https://gitlab.eclipse.org/eclipse/xfsc/tsa/task/-/issues/7 +func handler(_ context.Context, event cloudevents.Event) error { + fmt.Printf("Got Event Context: %+v\n", event.Context) + + var data map[string]interface{} + if err := json.Unmarshal(event.Data(), &data); err != nil { + fmt.Printf("Got Data Error: %s\n", err.Error()) + } + fmt.Printf("Got Data: %+v\n", data) + + return nil +} diff --git a/internal/config/config.go b/internal/config/config.go index baf99e58046ea65d99026f45a731999db4414c6f..dc187343e560d6cc21c2e15ca3bb0ae6408e28e9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,6 +12,7 @@ type Config struct { Cache cacheConfig Metrics metricsConfig OAuth oauthConfig + Nats natsConfig LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` } @@ -64,3 +65,8 @@ type oauthConfig struct { ClientSecret string `envconfig:"OAUTH_CLIENT_SECRET"` TokenURL string `envconfig:"OAUTH_TOKEN_URL"` } + +type natsConfig struct { + Addr string `envconfig:"NATS_ADDR"` + Subject string `envconfig:"NATS_SUBJECT" default:"external"` +}