diff --git a/cmd/cache/main.go b/cmd/cache/main.go index 9231fc18ebc0a03411c4b6db945d2c730c64e430..a53f7ad2e1ff8c68c393e71e95e53cfc9c155c3a 100644 --- a/cmd/cache/main.go +++ b/cmd/cache/main.go @@ -7,8 +7,6 @@ import ( "net/http" "time" - "github.com/cloudevents/sdk-go/protocol/nats/v2" - cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -50,22 +48,9 @@ func main() { // create redis client redis := redis.New(cfg.Redis.Addr, cfg.Redis.User, cfg.Redis.Pass, cfg.Redis.DB, cfg.Redis.TTL) - // create cloudevents nats sender - // other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol - sender, err := nats.NewSender(cfg.Nats.Addr, cfg.Nats.Subject, nats.NatsOptions()) - if err != nil { - log.Fatalf("failed to create nats sender, %v", err) - } - defer sender.Close(context.Background()) //nolint:errcheck - - // create cloudevents client - eventsClient, err := cloudevents.NewClient(sender) - if err != nil { - log.Fatalf("failed to create cloudevents client, %v", err) - } - // create event client - event := event.New(eventsClient) + events := event.New(cfg.Nats.Addr, cfg.Nats.Subject) + defer events.CLose(context.Background()) // create services var ( @@ -73,7 +58,7 @@ func main() { healthSvc goahealth.Service ) { - cacheSvc = cache.New(redis, event, logger) + cacheSvc = cache.New(redis, events, logger) healthSvc = health.New() } diff --git a/internal/clients/event/client.go b/internal/clients/event/client.go index 3a0a0225922d83b3afb6adb1484e5d1626bdea05..7baf3c193e09ea17da39107d428982a48d48b1fa 100644 --- a/internal/clients/event/client.go +++ b/internal/clients/event/client.go @@ -3,8 +3,11 @@ package event import ( "context" "fmt" + + "log" "time" + "github.com/cloudevents/sdk-go/protocol/nats/v2" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/event" "github.com/google/uuid" @@ -13,6 +16,7 @@ import ( const eventType = "cache_set_event" type Client struct { + sender *nats.Sender events cloudevents.Client } @@ -20,12 +24,28 @@ type Data struct { Key string `json:"key"` } -func New(eventClient cloudevents.Client) *Client { - return &Client{events: eventClient} +func New(addr, subject string) *Client { + // create cloudevents nats sender + // other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol + sender, err := nats.NewSender(addr, subject, nats.NatsOptions()) + if err != nil { + log.Fatalf("failed to create nats sender, %v", err) + } + + // create cloudevents client + eventsClient, err := cloudevents.NewClient(sender) + if err != nil { + log.Fatalf("failed to create cloudevents client, %v", err) + } + + return &Client{ + sender: sender, + events: eventsClient, + } } -func (c *Client) Send(ctx context.Context, key string, data interface{}) error { - e, err := newEvent(key, data) +func (c *Client) Send(ctx context.Context, key string) error { + e, err := newEvent(key) if err != nil { return err } @@ -38,7 +58,11 @@ func (c *Client) Send(ctx context.Context, key string, data interface{}) error { return nil } -func newEvent(key string, data interface{}) (*event.Event, error) { +func (c *Client) CLose(ctx context.Context) error { + return c.sender.Close(ctx) +} + +func newEvent(key string) (*event.Event, error) { e := cloudevents.NewEvent() e.SetID(uuid.NewString()) // required field e.SetSource("cache") // required field diff --git a/internal/service/cache/cachefakes/fake_queue.go b/internal/service/cache/cachefakes/fake_events.go similarity index 69% rename from internal/service/cache/cachefakes/fake_queue.go rename to internal/service/cache/cachefakes/fake_events.go index e2e47ea2555a83e97e4238758536d4f54db1887c..ae04df06db658092d83df0ba9c9d506fccc2d11a 100644 --- a/internal/service/cache/cachefakes/fake_queue.go +++ b/internal/service/cache/cachefakes/fake_events.go @@ -8,13 +8,12 @@ import ( "code.vereign.com/gaiax/tsa/cache/internal/service/cache" ) -type FakeQueue struct { - SendStub func(context.Context, string, interface{}) error +type FakeEvents struct { + SendStub func(context.Context, string) error sendMutex sync.RWMutex sendArgsForCall []struct { arg1 context.Context arg2 string - arg3 interface{} } sendReturns struct { result1 error @@ -26,20 +25,19 @@ type FakeQueue struct { invocationsMutex sync.RWMutex } -func (fake *FakeQueue) Send(arg1 context.Context, arg2 string, arg3 interface{}) error { +func (fake *FakeEvents) Send(arg1 context.Context, arg2 string) error { fake.sendMutex.Lock() ret, specificReturn := fake.sendReturnsOnCall[len(fake.sendArgsForCall)] fake.sendArgsForCall = append(fake.sendArgsForCall, struct { arg1 context.Context arg2 string - arg3 interface{} - }{arg1, arg2, arg3}) + }{arg1, arg2}) stub := fake.SendStub fakeReturns := fake.sendReturns - fake.recordInvocation("Send", []interface{}{arg1, arg2, arg3}) + fake.recordInvocation("Send", []interface{}{arg1, arg2}) fake.sendMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3) + return stub(arg1, arg2) } if specificReturn { return ret.result1 @@ -47,26 +45,26 @@ func (fake *FakeQueue) Send(arg1 context.Context, arg2 string, arg3 interface{}) return fakeReturns.result1 } -func (fake *FakeQueue) SendCallCount() int { +func (fake *FakeEvents) SendCallCount() int { fake.sendMutex.RLock() defer fake.sendMutex.RUnlock() return len(fake.sendArgsForCall) } -func (fake *FakeQueue) SendCalls(stub func(context.Context, string, interface{}) error) { +func (fake *FakeEvents) SendCalls(stub func(context.Context, string) error) { fake.sendMutex.Lock() defer fake.sendMutex.Unlock() fake.SendStub = stub } -func (fake *FakeQueue) SendArgsForCall(i int) (context.Context, string, interface{}) { +func (fake *FakeEvents) SendArgsForCall(i int) (context.Context, string) { fake.sendMutex.RLock() defer fake.sendMutex.RUnlock() argsForCall := fake.sendArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 + return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeQueue) SendReturns(result1 error) { +func (fake *FakeEvents) SendReturns(result1 error) { fake.sendMutex.Lock() defer fake.sendMutex.Unlock() fake.SendStub = nil @@ -75,7 +73,7 @@ func (fake *FakeQueue) SendReturns(result1 error) { }{result1} } -func (fake *FakeQueue) SendReturnsOnCall(i int, result1 error) { +func (fake *FakeEvents) SendReturnsOnCall(i int, result1 error) { fake.sendMutex.Lock() defer fake.sendMutex.Unlock() fake.SendStub = nil @@ -89,7 +87,7 @@ func (fake *FakeQueue) SendReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeQueue) Invocations() map[string][][]interface{} { +func (fake *FakeEvents) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() fake.sendMutex.RLock() @@ -101,7 +99,7 @@ func (fake *FakeQueue) Invocations() map[string][][]interface{} { return copiedInvocations } -func (fake *FakeQueue) recordInvocation(key string, args []interface{}) { +func (fake *FakeEvents) recordInvocation(key string, args []interface{}) { fake.invocationsMutex.Lock() defer fake.invocationsMutex.Unlock() if fake.invocations == nil { @@ -113,4 +111,4 @@ func (fake *FakeQueue) recordInvocation(key string, args []interface{}) { fake.invocations[key] = append(fake.invocations[key], args) } -var _ cache.Queue = new(FakeQueue) +var _ cache.Events = new(FakeEvents) diff --git a/internal/service/cache/service.go b/internal/service/cache/service.go index d1bdc4f1b4f46e68dea3be63785e43d4e332b9b5..60eef6b761d0938298a110b643c346d42fdf2185 100644 --- a/internal/service/cache/service.go +++ b/internal/service/cache/service.go @@ -12,27 +12,27 @@ import ( ) //go:generate counterfeiter . Cache -//go:generate counterfeiter . Queue +//go:generate counterfeiter . Events type Cache interface { Get(ctx context.Context, key string) ([]byte, error) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error } -type Queue interface { - Send(ctx context.Context, key string, value interface{}) error +type Events interface { + Send(ctx context.Context, key string) error } type Service struct { cache Cache - queue Queue + events Events logger *zap.Logger } -func New(cache Cache, queue Queue, logger *zap.Logger) *Service { +func New(cache Cache, queue Events, logger *zap.Logger) *Service { return &Service{ cache: cache, - queue: queue, + events: queue, logger: logger, } } @@ -106,7 +106,7 @@ func (s *Service) SetExternal(ctx context.Context, req *cache.CacheSetRequest) e key := makeCacheKey(req.Key, req.Namespace, req.Scope) // send an event for the input - if err := s.queue.Send(ctx, key, req.Data); err != nil { + if err := s.events.Send(ctx, key); err != nil { logger.Error("error sending an event for external input", zap.Error(err)) return errors.New("error sending an event for external input", err) } diff --git a/internal/service/cache/service_test.go b/internal/service/cache/service_test.go index 6d4d4aa253447222de841de5ed53881d5b96af79..c9e4c0b4137ccf6a527a908e80205a94b902d13c 100644 --- a/internal/service/cache/service_test.go +++ b/internal/service/cache/service_test.go @@ -186,10 +186,10 @@ func TestService_Set(t *testing.T) { func TestService_SetExternal(t *testing.T) { tests := []struct { - name string - cache *cachefakes.FakeCache - queue *cachefakes.FakeQueue - req *goacache.CacheSetRequest + name string + cache *cachefakes.FakeCache + events *cachefakes.FakeEvents + req *goacache.CacheSetRequest res interface{} errkind errors.Kind @@ -224,14 +224,14 @@ func TestService_SetExternal(t *testing.T) { return nil }, }, - queue: &cachefakes.FakeQueue{SendStub: func(ctx context.Context, s string, i interface{}) error { + events: &cachefakes.FakeEvents{SendStub: func(ctx context.Context, s string) error { return errors.New(errors.Unknown, "failed to send event") }}, errkind: errors.Unknown, errtext: "failed to send event", }, { - name: "successfully set value in cache and send an event to queue", + name: "successfully set value in cache and send an event to events", req: &goacache.CacheSetRequest{ Key: "key", Namespace: ptr.String("namespace"), @@ -243,7 +243,7 @@ func TestService_SetExternal(t *testing.T) { return nil }, }, - queue: &cachefakes.FakeQueue{SendStub: func(ctx context.Context, s string, i interface{}) error { + events: &cachefakes.FakeEvents{SendStub: func(ctx context.Context, s string) error { return nil }}, errtext: "", @@ -252,7 +252,7 @@ func TestService_SetExternal(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - svc := cache.New(test.cache, test.queue, zap.NewNop()) + svc := cache.New(test.cache, test.events, zap.NewNop()) err := svc.SetExternal(context.Background(), test.req) if err == nil { assert.Empty(t, test.errtext)