Skip to content
Snippets Groups Projects
notify.go 2.94 KiB
Newer Older
  • Learn to ignore specific revisions
  • package notify
    
    import (
    
    	"gitlab.eclipse.org/eclipse/xfsc/tsa/golib/errors"
    
    	"gitlab.eclipse.org/eclipse/xfsc/tsa/policy/internal/storage"
    
    //go:generate counterfeiter . Events
    
    //go:generate counterfeiter . Storage
    
    
    type Events interface {
    	Send(ctx context.Context, data any) error
    
    	PolicyChangeSubscribers(ctx context.Context, policyName, policyGroup, policyVersion string) ([]*storage.Subscriber, error)
    
    type Notifier struct {
    
    	events  Events
    	storage Storage
    	client  *http.Client
    	logger  *zap.Logger
    
    type EventPolicyChange struct {
    	Name    string `json:"name"`
    	Version string `json:"version"`
    	Group   string `json:"group"`
    }
    
    
    // New creates a policy change notifier for interested subscribers.
    // It can notify for policy changes both via MessageQueue or Web hooks.
    
    func New(events Events, storage Storage, client *http.Client, logger *zap.Logger) *Notifier {
    	return &Notifier{events: events, storage: storage, client: client, logger: logger}
    
    // PolicyDataChange is called when the policies source code or data are updated
    // in storage. The function will notify subscribers of the given changes.
    
    func (n *Notifier) PolicyDataChange(ctx context.Context, policyName, policyGroup, policyVersion string) error {
    
    	logger := n.logger.With(zap.String("operation", "PolicyDataChange"))
    
    	event := &EventPolicyChange{
    		Name:    policyName,
    		Version: policyVersion,
    		Group:   policyGroup,
    
    	go func() {
    		err := n.notifySubscribers(ctx, event)
    		if err != nil {
    			logger.Error("error notifying subscribers", zap.Error(err))
    		}
    	}()
    
    	return n.events.Send(ctx, event)
    }
    
    func (n *Notifier) notifySubscribers(ctx context.Context, event *EventPolicyChange) error {
    
    	subscribers, err := n.storage.PolicyChangeSubscribers(ctx, event.Name, event.Group, event.Version)
    	if err != nil {
    
    	}
    
    	for _, subscriber := range subscribers {
    
    		if err := n.notifySubscriber(ctx, subscriber, event); err != nil {
    			n.logger.Error("error notifying subscriber webhook",
    				zap.Error(err),
    				zap.String("subscriber", subscriber.Name),
    				zap.String("webhookURL", subscriber.WebhookURL),
    			)
    		}
    
    func (n *Notifier) notifySubscriber(ctx context.Context, subscriber *storage.Subscriber, event *EventPolicyChange) error {
    	eventJSON, err := json.Marshal(event)
    	if err != nil {
    		return err
    	}
    
    	req, err := http.NewRequestWithContext(ctx, http.MethodPost, subscriber.WebhookURL, bytes.NewBuffer(eventJSON))
    
    	req.Header.Set("Content-Type", "application/json")
    
    	res, err := n.client.Do(req)
    
    		return err
    	}
    	defer res.Body.Close()
    
    	if res.StatusCode != http.StatusOK {
    		return errors.New(errors.GetKind(res.StatusCode), getErrorBody(res))
    
    func getErrorBody(resp *http.Response) string {
    	body, err := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
    	if err != nil {
    		return ""