Skip to content
Snippets Groups Projects
Commit 8c55869b authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

#19 endpoint for external cache input

parent 8bf096e7
No related branches found
No related tags found
1 merge request!8Endpoint for external cache input
Pipeline #52136 passed with stages
in 53 seconds
......@@ -7,8 +7,6 @@ import (
"net/http"
"time"
"github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
......@@ -50,22 +48,9 @@ func main() {
// create redis client
redis := redis.New(cfg.Redis.Addr, cfg.Redis.User, cfg.Redis.Pass, cfg.Redis.DB, cfg.Redis.TTL)
// create cloudevents nats sender
// other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol
sender, err := nats.NewSender(cfg.Nats.Addr, cfg.Nats.Subject, nats.NatsOptions())
if err != nil {
log.Fatalf("failed to create nats sender, %v", err)
}
defer sender.Close(context.Background()) //nolint:errcheck
// create cloudevents client
eventsClient, err := cloudevents.NewClient(sender)
if err != nil {
log.Fatalf("failed to create cloudevents client, %v", err)
}
// create event client
event := event.New(eventsClient)
events := event.New(cfg.Nats.Addr, cfg.Nats.Subject)
defer events.CLose(context.Background())
// create services
var (
......@@ -73,7 +58,7 @@ func main() {
healthSvc goahealth.Service
)
{
cacheSvc = cache.New(redis, event, logger)
cacheSvc = cache.New(redis, events, logger)
healthSvc = health.New()
}
......
......@@ -3,8 +3,11 @@ package event
import (
"context"
"fmt"
"log"
"time"
"github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/google/uuid"
......@@ -13,6 +16,7 @@ import (
const eventType = "cache_set_event"
type Client struct {
sender *nats.Sender
events cloudevents.Client
}
......@@ -20,12 +24,28 @@ type Data struct {
Key string `json:"key"`
}
func New(eventClient cloudevents.Client) *Client {
return &Client{events: eventClient}
func New(addr, subject string) *Client {
// create cloudevents nats sender
// other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol
sender, err := nats.NewSender(addr, subject, nats.NatsOptions())
if err != nil {
log.Fatalf("failed to create nats sender, %v", err)
}
// create cloudevents client
eventsClient, err := cloudevents.NewClient(sender)
if err != nil {
log.Fatalf("failed to create cloudevents client, %v", err)
}
return &Client{
sender: sender,
events: eventsClient,
}
}
func (c *Client) Send(ctx context.Context, key string, data interface{}) error {
e, err := newEvent(key, data)
func (c *Client) Send(ctx context.Context, key string) error {
e, err := newEvent(key)
if err != nil {
return err
}
......@@ -38,7 +58,11 @@ func (c *Client) Send(ctx context.Context, key string, data interface{}) error {
return nil
}
func newEvent(key string, data interface{}) (*event.Event, error) {
func (c *Client) CLose(ctx context.Context) error {
return c.sender.Close(ctx)
}
func newEvent(key string) (*event.Event, error) {
e := cloudevents.NewEvent()
e.SetID(uuid.NewString()) // required field
e.SetSource("cache") // required field
......
......@@ -8,13 +8,12 @@ import (
"code.vereign.com/gaiax/tsa/cache/internal/service/cache"
)
type FakeQueue struct {
SendStub func(context.Context, string, interface{}) error
type FakeEvents struct {
SendStub func(context.Context, string) error
sendMutex sync.RWMutex
sendArgsForCall []struct {
arg1 context.Context
arg2 string
arg3 interface{}
}
sendReturns struct {
result1 error
......@@ -26,20 +25,19 @@ type FakeQueue struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeQueue) Send(arg1 context.Context, arg2 string, arg3 interface{}) error {
func (fake *FakeEvents) Send(arg1 context.Context, arg2 string) error {
fake.sendMutex.Lock()
ret, specificReturn := fake.sendReturnsOnCall[len(fake.sendArgsForCall)]
fake.sendArgsForCall = append(fake.sendArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 interface{}
}{arg1, arg2, arg3})
}{arg1, arg2})
stub := fake.SendStub
fakeReturns := fake.sendReturns
fake.recordInvocation("Send", []interface{}{arg1, arg2, arg3})
fake.recordInvocation("Send", []interface{}{arg1, arg2})
fake.sendMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
......@@ -47,26 +45,26 @@ func (fake *FakeQueue) Send(arg1 context.Context, arg2 string, arg3 interface{})
return fakeReturns.result1
}
func (fake *FakeQueue) SendCallCount() int {
func (fake *FakeEvents) SendCallCount() int {
fake.sendMutex.RLock()
defer fake.sendMutex.RUnlock()
return len(fake.sendArgsForCall)
}
func (fake *FakeQueue) SendCalls(stub func(context.Context, string, interface{}) error) {
func (fake *FakeEvents) SendCalls(stub func(context.Context, string) error) {
fake.sendMutex.Lock()
defer fake.sendMutex.Unlock()
fake.SendStub = stub
}
func (fake *FakeQueue) SendArgsForCall(i int) (context.Context, string, interface{}) {
func (fake *FakeEvents) SendArgsForCall(i int) (context.Context, string) {
fake.sendMutex.RLock()
defer fake.sendMutex.RUnlock()
argsForCall := fake.sendArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeQueue) SendReturns(result1 error) {
func (fake *FakeEvents) SendReturns(result1 error) {
fake.sendMutex.Lock()
defer fake.sendMutex.Unlock()
fake.SendStub = nil
......@@ -75,7 +73,7 @@ func (fake *FakeQueue) SendReturns(result1 error) {
}{result1}
}
func (fake *FakeQueue) SendReturnsOnCall(i int, result1 error) {
func (fake *FakeEvents) SendReturnsOnCall(i int, result1 error) {
fake.sendMutex.Lock()
defer fake.sendMutex.Unlock()
fake.SendStub = nil
......@@ -89,7 +87,7 @@ func (fake *FakeQueue) SendReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeQueue) Invocations() map[string][][]interface{} {
func (fake *FakeEvents) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.sendMutex.RLock()
......@@ -101,7 +99,7 @@ func (fake *FakeQueue) Invocations() map[string][][]interface{} {
return copiedInvocations
}
func (fake *FakeQueue) recordInvocation(key string, args []interface{}) {
func (fake *FakeEvents) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
......@@ -113,4 +111,4 @@ func (fake *FakeQueue) recordInvocation(key string, args []interface{}) {
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ cache.Queue = new(FakeQueue)
var _ cache.Events = new(FakeEvents)
......@@ -12,27 +12,27 @@ import (
)
//go:generate counterfeiter . Cache
//go:generate counterfeiter . Queue
//go:generate counterfeiter . Events
type Cache interface {
Get(ctx context.Context, key string) ([]byte, error)
Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
}
type Queue interface {
Send(ctx context.Context, key string, value interface{}) error
type Events interface {
Send(ctx context.Context, key string) error
}
type Service struct {
cache Cache
queue Queue
events Events
logger *zap.Logger
}
func New(cache Cache, queue Queue, logger *zap.Logger) *Service {
func New(cache Cache, queue Events, logger *zap.Logger) *Service {
return &Service{
cache: cache,
queue: queue,
events: queue,
logger: logger,
}
}
......@@ -106,7 +106,7 @@ func (s *Service) SetExternal(ctx context.Context, req *cache.CacheSetRequest) e
key := makeCacheKey(req.Key, req.Namespace, req.Scope)
// send an event for the input
if err := s.queue.Send(ctx, key, req.Data); err != nil {
if err := s.events.Send(ctx, key); err != nil {
logger.Error("error sending an event for external input", zap.Error(err))
return errors.New("error sending an event for external input", err)
}
......
......@@ -186,10 +186,10 @@ func TestService_Set(t *testing.T) {
func TestService_SetExternal(t *testing.T) {
tests := []struct {
name string
cache *cachefakes.FakeCache
queue *cachefakes.FakeQueue
req *goacache.CacheSetRequest
name string
cache *cachefakes.FakeCache
events *cachefakes.FakeEvents
req *goacache.CacheSetRequest
res interface{}
errkind errors.Kind
......@@ -224,14 +224,14 @@ func TestService_SetExternal(t *testing.T) {
return nil
},
},
queue: &cachefakes.FakeQueue{SendStub: func(ctx context.Context, s string, i interface{}) error {
events: &cachefakes.FakeEvents{SendStub: func(ctx context.Context, s string) error {
return errors.New(errors.Unknown, "failed to send event")
}},
errkind: errors.Unknown,
errtext: "failed to send event",
},
{
name: "successfully set value in cache and send an event to queue",
name: "successfully set value in cache and send an event to events",
req: &goacache.CacheSetRequest{
Key: "key",
Namespace: ptr.String("namespace"),
......@@ -243,7 +243,7 @@ func TestService_SetExternal(t *testing.T) {
return nil
},
},
queue: &cachefakes.FakeQueue{SendStub: func(ctx context.Context, s string, i interface{}) error {
events: &cachefakes.FakeEvents{SendStub: func(ctx context.Context, s string) error {
return nil
}},
errtext: "",
......@@ -252,7 +252,7 @@ func TestService_SetExternal(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
svc := cache.New(test.cache, test.queue, zap.NewNop())
svc := cache.New(test.cache, test.events, zap.NewNop())
err := svc.SetExternal(context.Background(), test.req)
if err == nil {
assert.Empty(t, test.errtext)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment