Skip to content
Snippets Groups Projects
Commit f46a6add authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

Subscribe for NATS cache events

parent 858bfc25
No related branches found
No related tags found
No related merge requests found
......@@ -32,6 +32,7 @@ import (
goatask "gitlab.eclipse.org/eclipse/xfsc/tsa/task/gen/task"
goatasklist "gitlab.eclipse.org/eclipse/xfsc/tsa/task/gen/task_list"
"gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/clients/cache"
"gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/clients/event"
"gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/clients/policy"
"gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/config"
"gitlab.eclipse.org/eclipse/xfsc/tsa/task/internal/executor"
......@@ -91,6 +92,17 @@ func main() {
// create cache client
cache := cache.New(cfg.Cache.Addr, cache.WithHTTPClient(oauthClient))
var events *event.Client
if cfg.Nats.Addr != "" {
events, err = event.New(cfg.Nats.Addr, cfg.Nats.Subject)
if err != nil {
logger.Fatal("failed to create events client", zap.Error(err))
}
defer events.Close(context.Background()) //nolint:errcheck
} else {
logger.Info("task service is not able to subscribe for cache events")
}
// create task executor
executor := executor.New(
storage,
......@@ -213,6 +225,11 @@ func main() {
g.Go(func() error {
return listExecutor.Start(ctx)
})
if events != nil {
g.Go(func() error {
return events.Start(ctx)
})
}
if err := g.Wait(); err != nil {
logger.Error("run group stopped", zap.Error(err))
}
......
package event
import (
"context"
"encoding/json"
"fmt"
"github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
type Client struct {
consumer *nats.Consumer
events cloudevents.Client
}
func New(addr, subject string) (*Client, error) {
// create cloudevents NATS consumer
// other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol
c, err := nats.NewConsumer(addr, subject, nats.NatsOptions())
if err != nil {
return nil, err
}
e, err := cloudevents.NewClient(c)
if err != nil {
return nil, err
}
return &Client{
consumer: c,
events: e,
}, nil
}
func (c *Client) Start(ctx context.Context) error {
for {
if err := c.events.StartReceiver(ctx, handler); err != nil {
return err
}
}
}
func (c *Client) Close(ctx context.Context) error {
return c.consumer.Close(ctx)
}
// handler is an example implementation.
// Implementation will be done in https://gitlab.eclipse.org/eclipse/xfsc/tsa/task/-/issues/7
func handler(_ context.Context, event cloudevents.Event) error {
fmt.Printf("Got Event Context: %+v\n", event.Context)
var data map[string]interface{}
if err := json.Unmarshal(event.Data(), &data); err != nil {
fmt.Printf("Got Data Error: %s\n", err.Error())
}
fmt.Printf("Got Data: %+v\n", data)
return nil
}
......@@ -12,6 +12,7 @@ type Config struct {
Cache cacheConfig
Metrics metricsConfig
OAuth oauthConfig
Nats natsConfig
LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"`
}
......@@ -64,3 +65,8 @@ type oauthConfig struct {
ClientSecret string `envconfig:"OAUTH_CLIENT_SECRET"`
TokenURL string `envconfig:"OAUTH_TOKEN_URL"`
}
type natsConfig struct {
Addr string `envconfig:"NATS_ADDR"`
Subject string `envconfig:"NATS_SUBJECT" default:"external"`
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment