Skip to content
Snippets Groups Projects
Commit 8bf096e7 authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

#19 code review changes

parent c56f0f69
No related branches found
No related tags found
1 merge request!8Endpoint for external cache input
Pipeline #52104 passed with stages
in 59 seconds
...@@ -7,7 +7,7 @@ import ( ...@@ -7,7 +7,7 @@ import (
"net/http" "net/http"
"time" "time"
cenats "github.com/cloudevents/sdk-go/protocol/nats/v2" "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2" cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/kelseyhightower/envconfig" "github.com/kelseyhightower/envconfig"
"go.uber.org/zap" "go.uber.org/zap"
...@@ -22,7 +22,7 @@ import ( ...@@ -22,7 +22,7 @@ import (
goahealthsrv "code.vereign.com/gaiax/tsa/cache/gen/http/health/server" goahealthsrv "code.vereign.com/gaiax/tsa/cache/gen/http/health/server"
goaopenapisrv "code.vereign.com/gaiax/tsa/cache/gen/http/openapi/server" goaopenapisrv "code.vereign.com/gaiax/tsa/cache/gen/http/openapi/server"
"code.vereign.com/gaiax/tsa/cache/gen/openapi" "code.vereign.com/gaiax/tsa/cache/gen/openapi"
"code.vereign.com/gaiax/tsa/cache/internal/clients/queue" "code.vereign.com/gaiax/tsa/cache/internal/clients/event"
"code.vereign.com/gaiax/tsa/cache/internal/clients/redis" "code.vereign.com/gaiax/tsa/cache/internal/clients/redis"
"code.vereign.com/gaiax/tsa/cache/internal/config" "code.vereign.com/gaiax/tsa/cache/internal/config"
"code.vereign.com/gaiax/tsa/cache/internal/service" "code.vereign.com/gaiax/tsa/cache/internal/service"
...@@ -52,7 +52,7 @@ func main() { ...@@ -52,7 +52,7 @@ func main() {
// create cloudevents nats sender // create cloudevents nats sender
// other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol // other protocol implementations: https://github.com/cloudevents/sdk-go/tree/main/protocol
sender, err := cenats.NewSender(cfg.Nats.Addr, cfg.Nats.Subject, cenats.NatsOptions()) sender, err := nats.NewSender(cfg.Nats.Addr, cfg.Nats.Subject, nats.NatsOptions())
if err != nil { if err != nil {
log.Fatalf("failed to create nats sender, %v", err) log.Fatalf("failed to create nats sender, %v", err)
} }
...@@ -64,8 +64,8 @@ func main() { ...@@ -64,8 +64,8 @@ func main() {
log.Fatalf("failed to create cloudevents client, %v", err) log.Fatalf("failed to create cloudevents client, %v", err)
} }
// create queue client // create event client
queue := queue.New(eventsClient) event := event.New(eventsClient)
// create services // create services
var ( var (
...@@ -73,7 +73,7 @@ func main() { ...@@ -73,7 +73,7 @@ func main() {
healthSvc goahealth.Service healthSvc goahealth.Service
) )
{ {
cacheSvc = cache.New(redis, queue, logger) cacheSvc = cache.New(redis, event, logger)
healthSvc = health.New() healthSvc = health.New()
} }
......
package queue package event
import ( import (
"context" "context"
...@@ -10,19 +10,18 @@ import ( ...@@ -10,19 +10,18 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
) )
const eventType = "External cache input" const eventType = "cache_set_event"
type Client struct { type Client struct {
eventClient cloudevents.Client events cloudevents.Client
} }
type Data struct { type Data struct {
Key string `json:"key"` Key string `json:"key"`
Value string `json:"value"`
} }
func New(eventClient cloudevents.Client) *Client { func New(eventClient cloudevents.Client) *Client {
return &Client{eventClient: eventClient} return &Client{events: eventClient}
} }
func (c *Client) Send(ctx context.Context, key string, data interface{}) error { func (c *Client) Send(ctx context.Context, key string, data interface{}) error {
...@@ -31,7 +30,7 @@ func (c *Client) Send(ctx context.Context, key string, data interface{}) error { ...@@ -31,7 +30,7 @@ func (c *Client) Send(ctx context.Context, key string, data interface{}) error {
return err return err
} }
res := c.eventClient.Send(ctx, *e) res := c.events.Send(ctx, *e)
if cloudevents.IsUndelivered(res) { if cloudevents.IsUndelivered(res) {
return fmt.Errorf("failed to send event for key: %s, reason: %v", key, res) return fmt.Errorf("failed to send event for key: %s, reason: %v", key, res)
} }
...@@ -42,13 +41,12 @@ func (c *Client) Send(ctx context.Context, key string, data interface{}) error { ...@@ -42,13 +41,12 @@ func (c *Client) Send(ctx context.Context, key string, data interface{}) error {
func newEvent(key string, data interface{}) (*event.Event, error) { func newEvent(key string, data interface{}) (*event.Event, error) {
e := cloudevents.NewEvent() e := cloudevents.NewEvent()
e.SetID(uuid.NewString()) // required field e.SetID(uuid.NewString()) // required field
e.SetSource(eventType) // required field e.SetSource("cache") // required field
e.SetType(eventType) // required field e.SetType(eventType) // required field
e.SetTime(time.Now()) e.SetTime(time.Now())
err := e.SetData(event.ApplicationJSON, &Data{ err := e.SetData(event.ApplicationJSON, &Data{
Key: key, Key: key,
Value: fmt.Sprintf("%v", data),
}) })
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -116,19 +116,11 @@ func (s *Service) SetExternal(ctx context.Context, req *cache.CacheSetRequest) e ...@@ -116,19 +116,11 @@ func (s *Service) SetExternal(ctx context.Context, req *cache.CacheSetRequest) e
func makeCacheKey(key string, namespace, scope *string) string { func makeCacheKey(key string, namespace, scope *string) string {
k := key k := key
var n, s string if namespace != nil && *namespace != "" {
if namespace != nil { k += "," + *namespace
n = *namespace
} }
if scope != nil { if scope != nil && *scope != "" {
s = *scope k += "," + *scope
}
if n != "" {
k += "," + n
}
if s != "" {
k += "," + s
} }
return k return k
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment