Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package queue
import (
"context"
"fmt"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/google/uuid"
"code.vereign.com/gaiax/tsa/golib/errors"
)
const eventType = "External cache input"
type Client struct {
eventClient cloudevents.Client
}
type Data struct {
Key string `json:"key"`
Value string `json:"value"`
}
func New(eventClient cloudevents.Client) *Client {
return &Client{eventClient: eventClient}
}
func (c *Client) Send(ctx context.Context, key string, data interface{}) error {
e, err := newEvent(key, data)
if err != nil {
return err
}
res := c.eventClient.Send(ctx, *e)
if cloudevents.IsUndelivered(res) {
return errors.New(fmt.Sprintf("failed to send event for key: %s, reason: %v", key, res))
}
return nil
}
func newEvent(key string, data interface{}) (*event.Event, error) {
e := cloudevents.NewEvent()
e.SetID(uuid.NewString()) // required field
e.SetSource(eventType) // required field
e.SetType(eventType) // required field
e.SetTime(time.Now())
err := e.SetData(event.ApplicationJSON, &Data{
Key: key,
Value: fmt.Sprintf("%v", data),
})
if err != nil {
return nil, err
}
return &e, nil
}