Skip to content
Snippets Groups Projects
Commit b604d36b authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

#19 endpoint for external cache input

parent be3d3a16
Branches
Tags
1 merge request!8Endpoint for external cache input
Pipeline #52051 failed
......@@ -7,6 +7,8 @@ import (
"net/http"
"time"
cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
......@@ -20,6 +22,7 @@ import (
goahealthsrv "code.vereign.com/gaiax/tsa/cache/gen/http/health/server"
goaopenapisrv "code.vereign.com/gaiax/tsa/cache/gen/http/openapi/server"
"code.vereign.com/gaiax/tsa/cache/gen/openapi"
"code.vereign.com/gaiax/tsa/cache/internal/clients/queue"
"code.vereign.com/gaiax/tsa/cache/internal/clients/redis"
"code.vereign.com/gaiax/tsa/cache/internal/config"
"code.vereign.com/gaiax/tsa/cache/internal/service"
......@@ -47,13 +50,30 @@ func main() {
// create redis client
redis := redis.New(cfg.Redis.Addr, cfg.Redis.User, cfg.Redis.Pass, cfg.Redis.DB, cfg.Redis.TTL)
// create cloudevents nats sender
// other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol
sender, err := cenats.NewSender(cfg.Nats.Addr, cfg.Nats.Subject, cenats.NatsOptions())
if err != nil {
log.Fatalf("failed to create nats sender, %v", err)
}
defer sender.Close(context.Background()) //nolint:errcheck
// create cloudevents client
eventsClient, err := cloudevents.NewClient(sender)
if err != nil {
log.Fatalf("failed to create cloudevents client, %v", err)
}
// create queue client
queue := queue.New(eventsClient)
// create services
var (
cacheSvc goacache.Service
healthSvc goahealth.Service
)
{
cacheSvc = cache.New(redis, logger)
cacheSvc = cache.New(redis, queue, logger)
healthSvc = health.New()
}
......
......@@ -88,6 +88,30 @@ var _ = Service("cache", func() {
Response(StatusCreated)
})
})
Method("SetExternal", func() {
Description("Set an external JSON value in the cache and provide an event for the input.")
Payload(CacheSetRequest)
Result(Empty)
HTTP(func() {
POST("/v1/external/cache")
Header("key:x-cache-key", String, "Cache entry key", func() {
Example("did:web:example.com")
})
Header("namespace:x-cache-namespace", String, "Cache entry namespace", func() {
Example("Login")
})
Header("scope:x-cache-scope", String, "Cache entry scope", func() {
Example("administration")
})
Body("data")
Response(StatusOK)
})
})
})
var _ = Service("openapi", func() {
......
package queue
import (
"context"
"fmt"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/google/uuid"
"code.vereign.com/gaiax/tsa/golib/errors"
)
const eventType = "External cache input"
type Client struct {
eventClient cloudevents.Client
}
type Data struct {
Key string `json:"key"`
Value string `json:"value"`
}
func New(eventClient cloudevents.Client) *Client {
return &Client{eventClient: eventClient}
}
func (c *Client) Send(ctx context.Context, key string, data interface{}) error {
e, err := newEvent(key, data)
if err != nil {
return err
}
res := c.eventClient.Send(ctx, *e)
if cloudevents.IsUndelivered(res) {
return errors.New(fmt.Sprintf("failed to send event for key: %s, reason: %v", key, res))
}
return nil
}
func newEvent(key string, data interface{}) (*event.Event, error) {
e := cloudevents.NewEvent()
e.SetID(uuid.NewString()) // required field
e.SetSource(eventType) // required field
e.SetType(eventType) // required field
e.SetTime(time.Now())
err := e.SetData(event.ApplicationJSON, &Data{
Key: key,
Value: fmt.Sprintf("%v", data),
})
if err != nil {
return nil, err
}
return &e, nil
}
......@@ -5,6 +5,7 @@ import "time"
type Config struct {
HTTP httpConfig
Redis redisConfig
Nats natsConfig
LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"`
}
......@@ -24,3 +25,8 @@ type redisConfig struct {
DB int `envconfig:"REDIS_DB" default:"0"`
TTL time.Duration `envconfig:"REDIS_EXPIRATION"` // no default expiration, keys are set to live forever
}
type natsConfig struct {
Addr string `envconfig:"NATS_ADDR" required:"true"`
Subject string `envconfig:"NATS_SUBJECT" default:"external"`
}
// Code generated by counterfeiter. DO NOT EDIT.
package cachefakes
import (
"context"
"sync"
"code.vereign.com/gaiax/tsa/cache/internal/service/cache"
)
type FakeQueue struct {
SendStub func(context.Context, string, interface{}) error
sendMutex sync.RWMutex
sendArgsForCall []struct {
arg1 context.Context
arg2 string
arg3 interface{}
}
sendReturns struct {
result1 error
}
sendReturnsOnCall map[int]struct {
result1 error
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *FakeQueue) Send(arg1 context.Context, arg2 string, arg3 interface{}) error {
fake.sendMutex.Lock()
ret, specificReturn := fake.sendReturnsOnCall[len(fake.sendArgsForCall)]
fake.sendArgsForCall = append(fake.sendArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 interface{}
}{arg1, arg2, arg3})
stub := fake.SendStub
fakeReturns := fake.sendReturns
fake.recordInvocation("Send", []interface{}{arg1, arg2, arg3})
fake.sendMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeQueue) SendCallCount() int {
fake.sendMutex.RLock()
defer fake.sendMutex.RUnlock()
return len(fake.sendArgsForCall)
}
func (fake *FakeQueue) SendCalls(stub func(context.Context, string, interface{}) error) {
fake.sendMutex.Lock()
defer fake.sendMutex.Unlock()
fake.SendStub = stub
}
func (fake *FakeQueue) SendArgsForCall(i int) (context.Context, string, interface{}) {
fake.sendMutex.RLock()
defer fake.sendMutex.RUnlock()
argsForCall := fake.sendArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeQueue) SendReturns(result1 error) {
fake.sendMutex.Lock()
defer fake.sendMutex.Unlock()
fake.SendStub = nil
fake.sendReturns = struct {
result1 error
}{result1}
}
func (fake *FakeQueue) SendReturnsOnCall(i int, result1 error) {
fake.sendMutex.Lock()
defer fake.sendMutex.Unlock()
fake.SendStub = nil
if fake.sendReturnsOnCall == nil {
fake.sendReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.sendReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeQueue) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.sendMutex.RLock()
defer fake.sendMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *FakeQueue) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ cache.Queue = new(FakeQueue)
......@@ -12,20 +12,27 @@ import (
)
//go:generate counterfeiter . Cache
//go:generate counterfeiter . Queue
type Cache interface {
Get(ctx context.Context, key string) ([]byte, error)
Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
}
type Queue interface {
Send(ctx context.Context, key string, value interface{}) error
}
type Service struct {
cache Cache
queue Queue
logger *zap.Logger
}
func New(cache Cache, logger *zap.Logger) *Service {
func New(cache Cache, queue Queue, logger *zap.Logger) *Service {
return &Service{
cache: cache,
queue: queue,
logger: logger,
}
}
......@@ -38,16 +45,8 @@ func (s *Service) Get(ctx context.Context, req *cache.CacheGetRequest) (interfac
return nil, errors.New(errors.BadRequest, "missing key")
}
var namespace, scope string
if req.Namespace != nil {
namespace = *req.Namespace
}
if req.Scope != nil {
scope = *req.Scope
}
// create key from the input fields
key := makeCacheKey(req.Key, namespace, scope)
key := makeCacheKey(req.Key, req.Namespace, req.Scope)
data, err := s.cache.Get(ctx, key)
if err != nil {
logger.Error("error getting value from cache", zap.String("key", key), zap.Error(err))
......@@ -74,18 +73,10 @@ func (s *Service) Set(ctx context.Context, req *cache.CacheSetRequest) error {
return errors.New(errors.BadRequest, "missing key")
}
var namespace, scope string
if req.Namespace != nil {
namespace = *req.Namespace
}
if req.Scope != nil {
scope = *req.Scope
}
// TODO(kinkov): issue #3 - evaluate key metadata (key, namespace and scope) and set TTL over a policy execution
// create key from the input fields
key := makeCacheKey(req.Key, namespace, scope)
key := makeCacheKey(req.Key, req.Namespace, req.Scope)
// encode payload to json bytes for storing in cache
value, err := json.Marshal(req.Data)
if err != nil {
......@@ -101,13 +92,43 @@ func (s *Service) Set(ctx context.Context, req *cache.CacheSetRequest) error {
return nil
}
func makeCacheKey(key, namespace, scope string) string {
// SetExternal sets an external JSON value in the cache and provide an event for the input.
func (s *Service) SetExternal(ctx context.Context, req *cache.CacheSetRequest) error {
logger := s.logger.With(zap.String("operation", "setExternal"))
// set value in cache
if err := s.Set(ctx, req); err != nil {
logger.Error("error setting external input in cache", zap.Error(err))
return errors.New("error setting external input in cache", err)
}
// create key from the input fields
key := makeCacheKey(req.Key, req.Namespace, req.Scope)
// send an event for the input
if err := s.queue.Send(ctx, key, req.Data); err != nil {
logger.Error("error sending an event for external input", zap.Error(err))
return errors.New("error sending an event for external input", err)
}
return nil
}
func makeCacheKey(key string, namespace, scope *string) string {
k := key
if namespace != "" {
k += "," + namespace
var n, s string
if namespace != nil {
n = *namespace
}
if scope != nil {
s = *scope
}
if n != "" {
k += "," + n
}
if scope != "" {
k += "," + scope
if s != "" {
k += "," + s
}
return k
}
......@@ -16,7 +16,7 @@ import (
)
func TestNew(t *testing.T) {
svc := cache.New(nil, zap.NewNop())
svc := cache.New(nil, nil, zap.NewNop())
assert.Implements(t, (*goacache.Service)(nil), svc)
}
......@@ -100,7 +100,7 @@ func TestService_Get(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
svc := cache.New(test.cache, zap.NewNop())
svc := cache.New(test.cache, nil, zap.NewNop())
res, err := svc.Get(context.Background(), test.req)
if err == nil {
assert.Empty(t, test.errtext)
......@@ -169,7 +169,7 @@ func TestService_Set(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
svc := cache.New(test.cache, zap.NewNop())
svc := cache.New(test.cache, nil, zap.NewNop())
err := svc.Set(context.Background(), test.req)
if err == nil {
assert.Empty(t, test.errtext)
......@@ -183,3 +183,86 @@ func TestService_Set(t *testing.T) {
})
}
}
func TestService_SetExternal(t *testing.T) {
tests := []struct {
name string
cache *cachefakes.FakeCache
queue *cachefakes.FakeQueue
req *goacache.CacheSetRequest
res interface{}
errkind errors.Kind
errtext string
}{
{
name: "error setting external input in cache",
req: &goacache.CacheSetRequest{
Key: "key",
Namespace: ptr.String("namespace"),
Scope: ptr.String("scope"),
Data: map[string]interface{}{"test": "value"},
},
cache: &cachefakes.FakeCache{
SetStub: func(ctx context.Context, key string, value []byte, ttl time.Duration) error {
return errors.New(errors.Timeout, "some error")
},
},
errkind: errors.Timeout,
errtext: "some error",
},
{
name: "error sending an event for external input",
req: &goacache.CacheSetRequest{
Key: "key",
Namespace: ptr.String("namespace"),
Scope: ptr.String("scope"),
Data: map[string]interface{}{"test": "value"},
},
cache: &cachefakes.FakeCache{
SetStub: func(ctx context.Context, key string, value []byte, ttl time.Duration) error {
return nil
},
},
queue: &cachefakes.FakeQueue{SendStub: func(ctx context.Context, s string, i interface{}) error {
return errors.New(errors.Unknown, "failed to send event")
}},
errkind: errors.Unknown,
errtext: "failed to send event",
},
{
name: "successfully set value in cache and send an event to queue",
req: &goacache.CacheSetRequest{
Key: "key",
Namespace: ptr.String("namespace"),
Scope: ptr.String("scope"),
Data: map[string]interface{}{"test": "value"},
},
cache: &cachefakes.FakeCache{
SetStub: func(ctx context.Context, key string, value []byte, ttl time.Duration) error {
return nil
},
},
queue: &cachefakes.FakeQueue{SendStub: func(ctx context.Context, s string, i interface{}) error {
return nil
}},
errtext: "",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
svc := cache.New(test.cache, test.queue, zap.NewNop())
err := svc.SetExternal(context.Background(), test.req)
if err == nil {
assert.Empty(t, test.errtext)
} else {
assert.Error(t, err)
e, ok := err.(*errors.Error)
assert.True(t, ok)
assert.Equal(t, test.errkind, e.Kind)
assert.Contains(t, e.Error(), test.errtext)
}
})
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment