diff --git a/cmd/cache/main.go b/cmd/cache/main.go index 0db3723ab09cb4cf45928bcbffbdd264324875e6..74c032b869ba5eed5ac7f609cada067d90a45376 100644 --- a/cmd/cache/main.go +++ b/cmd/cache/main.go @@ -7,6 +7,8 @@ import ( "net/http" "time" + cenats "github.com/cloudevents/sdk-go/protocol/nats/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -20,6 +22,7 @@ import ( goahealthsrv "code.vereign.com/gaiax/tsa/cache/gen/http/health/server" goaopenapisrv "code.vereign.com/gaiax/tsa/cache/gen/http/openapi/server" "code.vereign.com/gaiax/tsa/cache/gen/openapi" + "code.vereign.com/gaiax/tsa/cache/internal/clients/queue" "code.vereign.com/gaiax/tsa/cache/internal/clients/redis" "code.vereign.com/gaiax/tsa/cache/internal/config" "code.vereign.com/gaiax/tsa/cache/internal/service" @@ -47,13 +50,30 @@ func main() { // create redis client redis := redis.New(cfg.Redis.Addr, cfg.Redis.User, cfg.Redis.Pass, cfg.Redis.DB, cfg.Redis.TTL) + // create cloudevents nats sender + // other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol + sender, err := cenats.NewSender(cfg.Nats.Addr, cfg.Nats.Subject, cenats.NatsOptions()) + if err != nil { + log.Fatalf("failed to create nats sender, %v", err) + } + defer sender.Close(context.Background()) //nolint:errcheck + + // create cloudevents client + eventsClient, err := cloudevents.NewClient(sender) + if err != nil { + log.Fatalf("failed to create cloudevents client, %v", err) + } + + // create queue client + queue := queue.New(eventsClient) + // create services var ( cacheSvc goacache.Service healthSvc goahealth.Service ) { - cacheSvc = cache.New(redis, logger) + cacheSvc = cache.New(redis, queue, logger) healthSvc = health.New() } diff --git a/design/design.go b/design/design.go index df4c8c84acb64044fe38b34e5487573297d43302..5d526681e7f636984e8229fb5ffd58e2cbb519d5 100644 --- a/design/design.go +++ b/design/design.go @@ -88,6 +88,30 @@ var _ = Service("cache", func() { Response(StatusCreated) }) }) + + Method("SetExternal", func() { + Description("Set an external JSON value in the cache and provide an event for the input.") + + Payload(CacheSetRequest) + Result(Empty) + + HTTP(func() { + POST("/v1/external/cache") + + Header("key:x-cache-key", String, "Cache entry key", func() { + Example("did:web:example.com") + }) + Header("namespace:x-cache-namespace", String, "Cache entry namespace", func() { + Example("Login") + }) + Header("scope:x-cache-scope", String, "Cache entry scope", func() { + Example("administration") + }) + Body("data") + + Response(StatusOK) + }) + }) }) var _ = Service("openapi", func() { diff --git a/internal/clients/queue/client.go b/internal/clients/queue/client.go new file mode 100644 index 0000000000000000000000000000000000000000..5a424825f32cbb6e673fb7b1bd6c72c29ffb0fcf --- /dev/null +++ b/internal/clients/queue/client.go @@ -0,0 +1,60 @@ +package queue + +import ( + "context" + "fmt" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/google/uuid" + + "code.vereign.com/gaiax/tsa/golib/errors" +) + +const eventType = "External cache input" + +type Client struct { + eventClient cloudevents.Client +} + +type Data struct { + Key string `json:"key"` + Value string `json:"value"` +} + +func New(eventClient cloudevents.Client) *Client { + return &Client{eventClient: eventClient} +} + +func (c *Client) Send(ctx context.Context, key string, data interface{}) error { + e, err := newEvent(key, data) + if err != nil { + return err + } + + res := c.eventClient.Send(ctx, *e) + if cloudevents.IsUndelivered(res) { + return errors.New(fmt.Sprintf("failed to send event for key: %s, reason: %v", key, res)) + } + + return nil +} + +func newEvent(key string, data interface{}) (*event.Event, error) { + e := cloudevents.NewEvent() + e.SetID(uuid.NewString()) // required field + e.SetSource(eventType) // required field + e.SetType(eventType) // required field + e.SetTime(time.Now()) + + err := e.SetData(event.ApplicationJSON, &Data{ + Key: key, + Value: fmt.Sprintf("%v", data), + }) + if err != nil { + return nil, err + } + + return &e, nil +} diff --git a/internal/config/config.go b/internal/config/config.go index cdcfee1f3b5454ec096f6e787dd4a5552dd53fd4..6800576cf0c74dc12eff92556d325721fa3de623 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,6 +5,7 @@ import "time" type Config struct { HTTP httpConfig Redis redisConfig + Nats natsConfig LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` } @@ -24,3 +25,8 @@ type redisConfig struct { DB int `envconfig:"REDIS_DB" default:"0"` TTL time.Duration `envconfig:"REDIS_EXPIRATION"` // no default expiration, keys are set to live forever } + +type natsConfig struct { + Addr string `envconfig:"NATS_ADDR" required:"true"` + Subject string `envconfig:"NATS_SUBJECT" default:"external"` +} diff --git a/internal/service/cache/cachefakes/fake_queue.go b/internal/service/cache/cachefakes/fake_queue.go new file mode 100644 index 0000000000000000000000000000000000000000..e2e47ea2555a83e97e4238758536d4f54db1887c --- /dev/null +++ b/internal/service/cache/cachefakes/fake_queue.go @@ -0,0 +1,116 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package cachefakes + +import ( + "context" + "sync" + + "code.vereign.com/gaiax/tsa/cache/internal/service/cache" +) + +type FakeQueue struct { + SendStub func(context.Context, string, interface{}) error + sendMutex sync.RWMutex + sendArgsForCall []struct { + arg1 context.Context + arg2 string + arg3 interface{} + } + sendReturns struct { + result1 error + } + sendReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeQueue) Send(arg1 context.Context, arg2 string, arg3 interface{}) error { + fake.sendMutex.Lock() + ret, specificReturn := fake.sendReturnsOnCall[len(fake.sendArgsForCall)] + fake.sendArgsForCall = append(fake.sendArgsForCall, struct { + arg1 context.Context + arg2 string + arg3 interface{} + }{arg1, arg2, arg3}) + stub := fake.SendStub + fakeReturns := fake.sendReturns + fake.recordInvocation("Send", []interface{}{arg1, arg2, arg3}) + fake.sendMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeQueue) SendCallCount() int { + fake.sendMutex.RLock() + defer fake.sendMutex.RUnlock() + return len(fake.sendArgsForCall) +} + +func (fake *FakeQueue) SendCalls(stub func(context.Context, string, interface{}) error) { + fake.sendMutex.Lock() + defer fake.sendMutex.Unlock() + fake.SendStub = stub +} + +func (fake *FakeQueue) SendArgsForCall(i int) (context.Context, string, interface{}) { + fake.sendMutex.RLock() + defer fake.sendMutex.RUnlock() + argsForCall := fake.sendArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeQueue) SendReturns(result1 error) { + fake.sendMutex.Lock() + defer fake.sendMutex.Unlock() + fake.SendStub = nil + fake.sendReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeQueue) SendReturnsOnCall(i int, result1 error) { + fake.sendMutex.Lock() + defer fake.sendMutex.Unlock() + fake.SendStub = nil + if fake.sendReturnsOnCall == nil { + fake.sendReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.sendReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeQueue) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.sendMutex.RLock() + defer fake.sendMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeQueue) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ cache.Queue = new(FakeQueue) diff --git a/internal/service/cache/service.go b/internal/service/cache/service.go index dddcc536b7fa04bbed72147d8a5fa93b200589e9..49280a8037d1bd3a3d43ff330460dafdd5f77e27 100644 --- a/internal/service/cache/service.go +++ b/internal/service/cache/service.go @@ -12,20 +12,27 @@ import ( ) //go:generate counterfeiter . Cache +//go:generate counterfeiter . Queue type Cache interface { Get(ctx context.Context, key string) ([]byte, error) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error } +type Queue interface { + Send(ctx context.Context, key string, value interface{}) error +} + type Service struct { cache Cache + queue Queue logger *zap.Logger } -func New(cache Cache, logger *zap.Logger) *Service { +func New(cache Cache, queue Queue, logger *zap.Logger) *Service { return &Service{ cache: cache, + queue: queue, logger: logger, } } @@ -38,16 +45,8 @@ func (s *Service) Get(ctx context.Context, req *cache.CacheGetRequest) (interfac return nil, errors.New(errors.BadRequest, "missing key") } - var namespace, scope string - if req.Namespace != nil { - namespace = *req.Namespace - } - if req.Scope != nil { - scope = *req.Scope - } - // create key from the input fields - key := makeCacheKey(req.Key, namespace, scope) + key := makeCacheKey(req.Key, req.Namespace, req.Scope) data, err := s.cache.Get(ctx, key) if err != nil { logger.Error("error getting value from cache", zap.String("key", key), zap.Error(err)) @@ -74,18 +73,10 @@ func (s *Service) Set(ctx context.Context, req *cache.CacheSetRequest) error { return errors.New(errors.BadRequest, "missing key") } - var namespace, scope string - if req.Namespace != nil { - namespace = *req.Namespace - } - if req.Scope != nil { - scope = *req.Scope - } - // TODO(kinkov): issue #3 - evaluate key metadata (key, namespace and scope) and set TTL over a policy execution // create key from the input fields - key := makeCacheKey(req.Key, namespace, scope) + key := makeCacheKey(req.Key, req.Namespace, req.Scope) // encode payload to json bytes for storing in cache value, err := json.Marshal(req.Data) if err != nil { @@ -101,13 +92,43 @@ func (s *Service) Set(ctx context.Context, req *cache.CacheSetRequest) error { return nil } -func makeCacheKey(key, namespace, scope string) string { +// SetExternal sets an external JSON value in the cache and provide an event for the input. +func (s *Service) SetExternal(ctx context.Context, req *cache.CacheSetRequest) error { + logger := s.logger.With(zap.String("operation", "setExternal")) + + // set value in cache + if err := s.Set(ctx, req); err != nil { + logger.Error("error setting external input in cache", zap.Error(err)) + return errors.New("error setting external input in cache", err) + } + + // create key from the input fields + key := makeCacheKey(req.Key, req.Namespace, req.Scope) + + // send an event for the input + if err := s.queue.Send(ctx, key, req.Data); err != nil { + logger.Error("error sending an event for external input", zap.Error(err)) + return errors.New("error sending an event for external input", err) + } + + return nil +} + +func makeCacheKey(key string, namespace, scope *string) string { k := key - if namespace != "" { - k += "," + namespace + var n, s string + if namespace != nil { + n = *namespace + } + if scope != nil { + s = *scope + } + + if n != "" { + k += "," + n } - if scope != "" { - k += "," + scope + if s != "" { + k += "," + s } return k } diff --git a/internal/service/cache/service_test.go b/internal/service/cache/service_test.go index 4e09e0314fde2420325949fbbd17e6e3f8d4d338..6d4d4aa253447222de841de5ed53881d5b96af79 100644 --- a/internal/service/cache/service_test.go +++ b/internal/service/cache/service_test.go @@ -16,7 +16,7 @@ import ( ) func TestNew(t *testing.T) { - svc := cache.New(nil, zap.NewNop()) + svc := cache.New(nil, nil, zap.NewNop()) assert.Implements(t, (*goacache.Service)(nil), svc) } @@ -100,7 +100,7 @@ func TestService_Get(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - svc := cache.New(test.cache, zap.NewNop()) + svc := cache.New(test.cache, nil, zap.NewNop()) res, err := svc.Get(context.Background(), test.req) if err == nil { assert.Empty(t, test.errtext) @@ -169,7 +169,7 @@ func TestService_Set(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - svc := cache.New(test.cache, zap.NewNop()) + svc := cache.New(test.cache, nil, zap.NewNop()) err := svc.Set(context.Background(), test.req) if err == nil { assert.Empty(t, test.errtext) @@ -183,3 +183,86 @@ func TestService_Set(t *testing.T) { }) } } + +func TestService_SetExternal(t *testing.T) { + tests := []struct { + name string + cache *cachefakes.FakeCache + queue *cachefakes.FakeQueue + req *goacache.CacheSetRequest + + res interface{} + errkind errors.Kind + errtext string + }{ + { + name: "error setting external input in cache", + req: &goacache.CacheSetRequest{ + Key: "key", + Namespace: ptr.String("namespace"), + Scope: ptr.String("scope"), + Data: map[string]interface{}{"test": "value"}, + }, + cache: &cachefakes.FakeCache{ + SetStub: func(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return errors.New(errors.Timeout, "some error") + }, + }, + errkind: errors.Timeout, + errtext: "some error", + }, + { + name: "error sending an event for external input", + req: &goacache.CacheSetRequest{ + Key: "key", + Namespace: ptr.String("namespace"), + Scope: ptr.String("scope"), + Data: map[string]interface{}{"test": "value"}, + }, + cache: &cachefakes.FakeCache{ + SetStub: func(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return nil + }, + }, + queue: &cachefakes.FakeQueue{SendStub: func(ctx context.Context, s string, i interface{}) error { + return errors.New(errors.Unknown, "failed to send event") + }}, + errkind: errors.Unknown, + errtext: "failed to send event", + }, + { + name: "successfully set value in cache and send an event to queue", + req: &goacache.CacheSetRequest{ + Key: "key", + Namespace: ptr.String("namespace"), + Scope: ptr.String("scope"), + Data: map[string]interface{}{"test": "value"}, + }, + cache: &cachefakes.FakeCache{ + SetStub: func(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return nil + }, + }, + queue: &cachefakes.FakeQueue{SendStub: func(ctx context.Context, s string, i interface{}) error { + return nil + }}, + errtext: "", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + svc := cache.New(test.cache, test.queue, zap.NewNop()) + err := svc.SetExternal(context.Background(), test.req) + if err == nil { + assert.Empty(t, test.errtext) + } else { + assert.Error(t, err) + e, ok := err.(*errors.Error) + assert.True(t, ok) + assert.Equal(t, test.errkind, e.Kind) + assert.Contains(t, e.Error(), test.errtext) + } + }) + } +}