diff --git a/cmd/policy/main.go b/cmd/policy/main.go index 7396227f54da7a709af1ec8d284d84c5c0aed7e0..fb44df7f47d972b412ccec6c1627b4fae6da8251 100644 --- a/cmd/policy/main.go +++ b/cmd/policy/main.go @@ -93,7 +93,7 @@ func main() { defer events.Close(context.Background()) // create policy change subscribers collection - var subscribers []storage.PolicyChangeSubscriber + var subscribers []storage.PolicySubscriber // create rego policy cache regocache := regocache.New() @@ -116,7 +116,7 @@ func main() { } // subscribe the cache for policy data changes - storage.AddPolicyChangeSubscribers(subscribers...) + storage.AddPolicySubscribers(subscribers...) // create policy data refresher var dataRefresher *policydata.Refresher diff --git a/internal/notify/notify.go b/internal/notify/notify.go index 61f4de0ba7ba07b7aa2a6c0446425fec1c84ee59..8f8fe534fbe4a73f5020bfa11b80747d9c1d3572 100644 --- a/internal/notify/notify.go +++ b/internal/notify/notify.go @@ -21,7 +21,7 @@ type Events interface { } type Storage interface { - PolicyChangeSubscribers(ctx context.Context, policyRepository, policyName, policyGroup, policyVersion string) ([]*storage.Subscriber, error) + PolicySubscribers(ctx context.Context, policyRepository, policyName, policyGroup, policyVersion string) ([]*storage.Subscriber, error) } type Notifier struct { @@ -69,7 +69,7 @@ func (n *Notifier) PolicyDataChange(ctx context.Context, policyRepository, polic } func (n *Notifier) notifySubscribers(ctx context.Context, event *EventPolicyChange) error { - subscribers, err := n.storage.PolicyChangeSubscribers(ctx, event.Repository, event.Name, event.Group, event.Version) + subscribers, err := n.storage.PolicySubscribers(ctx, event.Repository, event.Name, event.Group, event.Version) if err != nil { return err } diff --git a/internal/notify/notify_test.go b/internal/notify/notify_test.go index 95996d0d257b68c48d332d51ed2a2361d67e84d8..c155286b8f40ad6bf3c974cbed6f3d813c666b99 100644 --- a/internal/notify/notify_test.go +++ b/internal/notify/notify_test.go @@ -29,7 +29,7 @@ func TestNotify_PolicyDataChange(t *testing.T) { { name: "error when sending event", eventPolicyChange: ¬ify.EventPolicyChange{Repository: "exampleRepo", Name: "exampleName", Version: "exampleVersion", Group: "exampleGroup"}, - storage: ¬ifyfakes.FakeStorage{PolicyChangeSubscribersStub: func(ctx context.Context, s1, s2, s3, s4 string) ([]*storage.Subscriber, error) { + storage: ¬ifyfakes.FakeStorage{PolicySubscribersStub: func(ctx context.Context, s1, s2, s3, s4 string) ([]*storage.Subscriber, error) { return []*storage.Subscriber{}, nil }}, events: ¬ifyfakes.FakeEvents{SendStub: func(ctx context.Context, a any) error { @@ -42,7 +42,7 @@ func TestNotify_PolicyDataChange(t *testing.T) { { name: "sending event is successful", eventPolicyChange: ¬ify.EventPolicyChange{Repository: "exampleRepo", Name: "exampleName", Version: "exampleVersion", Group: "exampleGroup"}, - storage: ¬ifyfakes.FakeStorage{PolicyChangeSubscribersStub: func(ctx context.Context, s1, s2, s3, s4 string) ([]*storage.Subscriber, error) { + storage: ¬ifyfakes.FakeStorage{PolicySubscribersStub: func(ctx context.Context, s1, s2, s3, s4 string) ([]*storage.Subscriber, error) { return []*storage.Subscriber{}, nil }}, events: ¬ifyfakes.FakeEvents{SendStub: func(ctx context.Context, a any) error { @@ -53,7 +53,7 @@ func TestNotify_PolicyDataChange(t *testing.T) { { name: "storage return error", eventPolicyChange: ¬ify.EventPolicyChange{Repository: "exampleRepo", Name: "exampleName", Version: "exampleVersion", Group: "exampleGroup"}, - storage: ¬ifyfakes.FakeStorage{PolicyChangeSubscribersStub: func(ctx context.Context, s1, s2, s3, s4 string) ([]*storage.Subscriber, error) { + storage: ¬ifyfakes.FakeStorage{PolicySubscribersStub: func(ctx context.Context, s1, s2, s3, s4 string) ([]*storage.Subscriber, error) { return []*storage.Subscriber{}, fmt.Errorf("some error") }}, events: ¬ifyfakes.FakeEvents{SendStub: func(ctx context.Context, a any) error { @@ -66,7 +66,7 @@ func TestNotify_PolicyDataChange(t *testing.T) { { name: "wrong webhook url return error", eventPolicyChange: ¬ify.EventPolicyChange{Repository: "exampleRepo", Name: "exampleName", Version: "exampleVersion", Group: "exampleGroup"}, - storage: ¬ifyfakes.FakeStorage{PolicyChangeSubscribersStub: func(ctx context.Context, s1, s2, s3, s4 string) ([]*storage.Subscriber, error) { + storage: ¬ifyfakes.FakeStorage{PolicySubscribersStub: func(ctx context.Context, s1, s2, s3, s4 string) ([]*storage.Subscriber, error) { return []*storage.Subscriber{{WebhookURL: "wrong/url"}}, nil }}, events: ¬ifyfakes.FakeEvents{SendStub: func(ctx context.Context, a any) error { diff --git a/internal/notify/notifyfakes/fake_storage.go b/internal/notify/notifyfakes/fake_storage.go index 184a47fdd46ccdd7d4192726b148419e5f84b9e4..3a2bb313ec85dce3d64dde9bad7e22e948d59501 100644 --- a/internal/notify/notifyfakes/fake_storage.go +++ b/internal/notify/notifyfakes/fake_storage.go @@ -10,20 +10,20 @@ import ( ) type FakeStorage struct { - PolicyChangeSubscribersStub func(context.Context, string, string, string, string) ([]*storage.Subscriber, error) - policyChangeSubscribersMutex sync.RWMutex - policyChangeSubscribersArgsForCall []struct { + PolicySubscribersStub func(context.Context, string, string, string, string) ([]*storage.Subscriber, error) + policySubscribersMutex sync.RWMutex + policySubscribersArgsForCall []struct { arg1 context.Context arg2 string arg3 string arg4 string arg5 string } - policyChangeSubscribersReturns struct { + policySubscribersReturns struct { result1 []*storage.Subscriber result2 error } - policyChangeSubscribersReturnsOnCall map[int]struct { + policySubscribersReturnsOnCall map[int]struct { result1 []*storage.Subscriber result2 error } @@ -31,20 +31,20 @@ type FakeStorage struct { invocationsMutex sync.RWMutex } -func (fake *FakeStorage) PolicyChangeSubscribers(arg1 context.Context, arg2 string, arg3 string, arg4 string, arg5 string) ([]*storage.Subscriber, error) { - fake.policyChangeSubscribersMutex.Lock() - ret, specificReturn := fake.policyChangeSubscribersReturnsOnCall[len(fake.policyChangeSubscribersArgsForCall)] - fake.policyChangeSubscribersArgsForCall = append(fake.policyChangeSubscribersArgsForCall, struct { +func (fake *FakeStorage) PolicySubscribers(arg1 context.Context, arg2 string, arg3 string, arg4 string, arg5 string) ([]*storage.Subscriber, error) { + fake.policySubscribersMutex.Lock() + ret, specificReturn := fake.policySubscribersReturnsOnCall[len(fake.policySubscribersArgsForCall)] + fake.policySubscribersArgsForCall = append(fake.policySubscribersArgsForCall, struct { arg1 context.Context arg2 string arg3 string arg4 string arg5 string }{arg1, arg2, arg3, arg4, arg5}) - stub := fake.PolicyChangeSubscribersStub - fakeReturns := fake.policyChangeSubscribersReturns - fake.recordInvocation("PolicyChangeSubscribers", []interface{}{arg1, arg2, arg3, arg4, arg5}) - fake.policyChangeSubscribersMutex.Unlock() + stub := fake.PolicySubscribersStub + fakeReturns := fake.policySubscribersReturns + fake.recordInvocation("PolicySubscribers", []interface{}{arg1, arg2, arg3, arg4, arg5}) + fake.policySubscribersMutex.Unlock() if stub != nil { return stub(arg1, arg2, arg3, arg4, arg5) } @@ -54,46 +54,46 @@ func (fake *FakeStorage) PolicyChangeSubscribers(arg1 context.Context, arg2 stri return fakeReturns.result1, fakeReturns.result2 } -func (fake *FakeStorage) PolicyChangeSubscribersCallCount() int { - fake.policyChangeSubscribersMutex.RLock() - defer fake.policyChangeSubscribersMutex.RUnlock() - return len(fake.policyChangeSubscribersArgsForCall) +func (fake *FakeStorage) PolicySubscribersCallCount() int { + fake.policySubscribersMutex.RLock() + defer fake.policySubscribersMutex.RUnlock() + return len(fake.policySubscribersArgsForCall) } -func (fake *FakeStorage) PolicyChangeSubscribersCalls(stub func(context.Context, string, string, string, string) ([]*storage.Subscriber, error)) { - fake.policyChangeSubscribersMutex.Lock() - defer fake.policyChangeSubscribersMutex.Unlock() - fake.PolicyChangeSubscribersStub = stub +func (fake *FakeStorage) PolicySubscribersCalls(stub func(context.Context, string, string, string, string) ([]*storage.Subscriber, error)) { + fake.policySubscribersMutex.Lock() + defer fake.policySubscribersMutex.Unlock() + fake.PolicySubscribersStub = stub } -func (fake *FakeStorage) PolicyChangeSubscribersArgsForCall(i int) (context.Context, string, string, string, string) { - fake.policyChangeSubscribersMutex.RLock() - defer fake.policyChangeSubscribersMutex.RUnlock() - argsForCall := fake.policyChangeSubscribersArgsForCall[i] +func (fake *FakeStorage) PolicySubscribersArgsForCall(i int) (context.Context, string, string, string, string) { + fake.policySubscribersMutex.RLock() + defer fake.policySubscribersMutex.RUnlock() + argsForCall := fake.policySubscribersArgsForCall[i] return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 } -func (fake *FakeStorage) PolicyChangeSubscribersReturns(result1 []*storage.Subscriber, result2 error) { - fake.policyChangeSubscribersMutex.Lock() - defer fake.policyChangeSubscribersMutex.Unlock() - fake.PolicyChangeSubscribersStub = nil - fake.policyChangeSubscribersReturns = struct { +func (fake *FakeStorage) PolicySubscribersReturns(result1 []*storage.Subscriber, result2 error) { + fake.policySubscribersMutex.Lock() + defer fake.policySubscribersMutex.Unlock() + fake.PolicySubscribersStub = nil + fake.policySubscribersReturns = struct { result1 []*storage.Subscriber result2 error }{result1, result2} } -func (fake *FakeStorage) PolicyChangeSubscribersReturnsOnCall(i int, result1 []*storage.Subscriber, result2 error) { - fake.policyChangeSubscribersMutex.Lock() - defer fake.policyChangeSubscribersMutex.Unlock() - fake.PolicyChangeSubscribersStub = nil - if fake.policyChangeSubscribersReturnsOnCall == nil { - fake.policyChangeSubscribersReturnsOnCall = make(map[int]struct { +func (fake *FakeStorage) PolicySubscribersReturnsOnCall(i int, result1 []*storage.Subscriber, result2 error) { + fake.policySubscribersMutex.Lock() + defer fake.policySubscribersMutex.Unlock() + fake.PolicySubscribersStub = nil + if fake.policySubscribersReturnsOnCall == nil { + fake.policySubscribersReturnsOnCall = make(map[int]struct { result1 []*storage.Subscriber result2 error }) } - fake.policyChangeSubscribersReturnsOnCall[i] = struct { + fake.policySubscribersReturnsOnCall[i] = struct { result1 []*storage.Subscriber result2 error }{result1, result2} @@ -102,8 +102,8 @@ func (fake *FakeStorage) PolicyChangeSubscribersReturnsOnCall(i int, result1 []* func (fake *FakeStorage) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.policyChangeSubscribersMutex.RLock() - defer fake.policyChangeSubscribersMutex.RUnlock() + fake.policySubscribersMutex.RLock() + defer fake.policySubscribersMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/internal/service/policy/bundle.go b/internal/service/policy/bundle.go index cd644f6e6e3306bcbebb957c48d94ee3f78faa31..2d7dd5c4924b80b7f910d4a4a1ae25ad72dc1c68 100644 --- a/internal/service/policy/bundle.go +++ b/internal/service/policy/bundle.go @@ -200,7 +200,7 @@ func (s *Service) policyFromBundle(bundle []byte) (*storage.Policy, error) { func policyExportConfig(p *storage.Policy) (*exportConfig, error) { if strings.TrimSpace(p.ExportConfig) == "" { - return nil, fmt.Errorf("policy export configuration is not defined") + return nil, errors.New(errors.Forbidden, "policy export configuration is not defined") } var cfg exportConfig diff --git a/internal/service/policy/policyfakes/fake_storage.go b/internal/service/policy/policyfakes/fake_storage.go index 574b2accaddf48a7ab0c18f38ebb4dde77582e9b..ffa91cea346d5add639d5f6b5bb59a599d67be9d 100644 --- a/internal/service/policy/policyfakes/fake_storage.go +++ b/internal/service/policy/policyfakes/fake_storage.go @@ -23,10 +23,10 @@ type FakeStorage struct { result1 []*storage.PolicyAutoImport result2 error } - AddPolicyChangeSubscribersStub func(...storage.PolicyChangeSubscriber) - addPolicyChangeSubscribersMutex sync.RWMutex - addPolicyChangeSubscribersArgsForCall []struct { - arg1 []storage.PolicyChangeSubscriber + AddPolicySubscribersStub func(...storage.PolicySubscriber) + addPolicySubscribersMutex sync.RWMutex + addPolicySubscribersArgsForCall []struct { + arg1 []storage.PolicySubscriber } AutoImportConfigStub func(context.Context, string) (*storage.PolicyAutoImport, error) autoImportConfigMutex sync.RWMutex @@ -207,6 +207,25 @@ type FakeStorage struct { setPolicyLockReturnsOnCall map[int]struct { result1 error } + SubscriberStub func(context.Context, string, string, string, string, string, string) (*storage.Subscriber, error) + subscriberMutex sync.RWMutex + subscriberArgsForCall []struct { + arg1 context.Context + arg2 string + arg3 string + arg4 string + arg5 string + arg6 string + arg7 string + } + subscriberReturns struct { + result1 *storage.Subscriber + result2 error + } + subscriberReturnsOnCall map[int]struct { + result1 *storage.Subscriber + result2 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -275,35 +294,35 @@ func (fake *FakeStorage) ActiveImportConfigsReturnsOnCall(i int, result1 []*stor }{result1, result2} } -func (fake *FakeStorage) AddPolicyChangeSubscribers(arg1 ...storage.PolicyChangeSubscriber) { - fake.addPolicyChangeSubscribersMutex.Lock() - fake.addPolicyChangeSubscribersArgsForCall = append(fake.addPolicyChangeSubscribersArgsForCall, struct { - arg1 []storage.PolicyChangeSubscriber +func (fake *FakeStorage) AddPolicySubscribers(arg1 ...storage.PolicySubscriber) { + fake.addPolicySubscribersMutex.Lock() + fake.addPolicySubscribersArgsForCall = append(fake.addPolicySubscribersArgsForCall, struct { + arg1 []storage.PolicySubscriber }{arg1}) - stub := fake.AddPolicyChangeSubscribersStub - fake.recordInvocation("AddPolicyChangeSubscribers", []interface{}{arg1}) - fake.addPolicyChangeSubscribersMutex.Unlock() + stub := fake.AddPolicySubscribersStub + fake.recordInvocation("AddPolicySubscribers", []interface{}{arg1}) + fake.addPolicySubscribersMutex.Unlock() if stub != nil { - fake.AddPolicyChangeSubscribersStub(arg1...) + fake.AddPolicySubscribersStub(arg1...) } } -func (fake *FakeStorage) AddPolicyChangeSubscribersCallCount() int { - fake.addPolicyChangeSubscribersMutex.RLock() - defer fake.addPolicyChangeSubscribersMutex.RUnlock() - return len(fake.addPolicyChangeSubscribersArgsForCall) +func (fake *FakeStorage) AddPolicySubscribersCallCount() int { + fake.addPolicySubscribersMutex.RLock() + defer fake.addPolicySubscribersMutex.RUnlock() + return len(fake.addPolicySubscribersArgsForCall) } -func (fake *FakeStorage) AddPolicyChangeSubscribersCalls(stub func(...storage.PolicyChangeSubscriber)) { - fake.addPolicyChangeSubscribersMutex.Lock() - defer fake.addPolicyChangeSubscribersMutex.Unlock() - fake.AddPolicyChangeSubscribersStub = stub +func (fake *FakeStorage) AddPolicySubscribersCalls(stub func(...storage.PolicySubscriber)) { + fake.addPolicySubscribersMutex.Lock() + defer fake.addPolicySubscribersMutex.Unlock() + fake.AddPolicySubscribersStub = stub } -func (fake *FakeStorage) AddPolicyChangeSubscribersArgsForCall(i int) []storage.PolicyChangeSubscriber { - fake.addPolicyChangeSubscribersMutex.RLock() - defer fake.addPolicyChangeSubscribersMutex.RUnlock() - argsForCall := fake.addPolicyChangeSubscribersArgsForCall[i] +func (fake *FakeStorage) AddPolicySubscribersArgsForCall(i int) []storage.PolicySubscriber { + fake.addPolicySubscribersMutex.RLock() + defer fake.addPolicySubscribersMutex.RUnlock() + argsForCall := fake.addPolicySubscribersArgsForCall[i] return argsForCall.arg1 } @@ -1169,13 +1188,83 @@ func (fake *FakeStorage) SetPolicyLockReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeStorage) Subscriber(arg1 context.Context, arg2 string, arg3 string, arg4 string, arg5 string, arg6 string, arg7 string) (*storage.Subscriber, error) { + fake.subscriberMutex.Lock() + ret, specificReturn := fake.subscriberReturnsOnCall[len(fake.subscriberArgsForCall)] + fake.subscriberArgsForCall = append(fake.subscriberArgsForCall, struct { + arg1 context.Context + arg2 string + arg3 string + arg4 string + arg5 string + arg6 string + arg7 string + }{arg1, arg2, arg3, arg4, arg5, arg6, arg7}) + stub := fake.SubscriberStub + fakeReturns := fake.subscriberReturns + fake.recordInvocation("Subscriber", []interface{}{arg1, arg2, arg3, arg4, arg5, arg6, arg7}) + fake.subscriberMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4, arg5, arg6, arg7) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeStorage) SubscriberCallCount() int { + fake.subscriberMutex.RLock() + defer fake.subscriberMutex.RUnlock() + return len(fake.subscriberArgsForCall) +} + +func (fake *FakeStorage) SubscriberCalls(stub func(context.Context, string, string, string, string, string, string) (*storage.Subscriber, error)) { + fake.subscriberMutex.Lock() + defer fake.subscriberMutex.Unlock() + fake.SubscriberStub = stub +} + +func (fake *FakeStorage) SubscriberArgsForCall(i int) (context.Context, string, string, string, string, string, string) { + fake.subscriberMutex.RLock() + defer fake.subscriberMutex.RUnlock() + argsForCall := fake.subscriberArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6, argsForCall.arg7 +} + +func (fake *FakeStorage) SubscriberReturns(result1 *storage.Subscriber, result2 error) { + fake.subscriberMutex.Lock() + defer fake.subscriberMutex.Unlock() + fake.SubscriberStub = nil + fake.subscriberReturns = struct { + result1 *storage.Subscriber + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) SubscriberReturnsOnCall(i int, result1 *storage.Subscriber, result2 error) { + fake.subscriberMutex.Lock() + defer fake.subscriberMutex.Unlock() + fake.SubscriberStub = nil + if fake.subscriberReturnsOnCall == nil { + fake.subscriberReturnsOnCall = make(map[int]struct { + result1 *storage.Subscriber + result2 error + }) + } + fake.subscriberReturnsOnCall[i] = struct { + result1 *storage.Subscriber + result2 error + }{result1, result2} +} + func (fake *FakeStorage) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() fake.activeImportConfigsMutex.RLock() defer fake.activeImportConfigsMutex.RUnlock() - fake.addPolicyChangeSubscribersMutex.RLock() - defer fake.addPolicyChangeSubscribersMutex.RUnlock() + fake.addPolicySubscribersMutex.RLock() + defer fake.addPolicySubscribersMutex.RUnlock() fake.autoImportConfigMutex.RLock() defer fake.autoImportConfigMutex.RUnlock() fake.autoImportConfigsMutex.RLock() @@ -1204,6 +1293,8 @@ func (fake *FakeStorage) Invocations() map[string][][]interface{} { defer fake.setDataMutex.RUnlock() fake.setPolicyLockMutex.RLock() defer fake.setPolicyLockMutex.RUnlock() + fake.subscriberMutex.RLock() + defer fake.subscriberMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/internal/service/policy/service.go b/internal/service/policy/service.go index 00d73b9dd3154eb4ba3694022cfd23e8b63412f8..4f9c655e6198d5a94472b517bdcabbd71d72c0b6 100644 --- a/internal/service/policy/service.go +++ b/internal/service/policy/service.go @@ -576,6 +576,20 @@ func (s *Service) ListPolicies(ctx context.Context, req *policy.PoliciesRequest) func (s *Service) SubscribeForPolicyChange(ctx context.Context, req *policy.SubscribeRequest) (any, error) { logger := s.logger.With(zap.String("operation", "subscribeForPolicyChange")) + _, err := s.storage.Policy(ctx, req.Repository, req.Group, req.PolicyName, req.Version) + if err != nil { + return nil, err + } + + sub, err := s.storage.Subscriber(ctx, req.Repository, req.Group, req.PolicyName, req.Version, req.WebhookURL, req.Subscriber) + if err != nil && !errors.Is(errors.NotFound, err) { + return nil, errors.New("error while retrieving subscriber", err) + } + + if sub != nil { + return nil, errors.New(errors.Exist, "subscriber already exist") + } + subscriber, err := s.storage.CreateSubscriber(ctx, &storage.Subscriber{ Name: req.Subscriber, WebhookURL: req.WebhookURL, diff --git a/internal/service/policy/service_test.go b/internal/service/policy/service_test.go index 977119d4a5131e7b915042aa789f108505e59cb6..5bb953d7d700ed2488dc00993a9284ba71523589 100644 --- a/internal/service/policy/service_test.go +++ b/internal/service/policy/service_test.go @@ -1106,6 +1106,54 @@ func TestService_SubscribeForPolicyChange(t *testing.T) { errText string }{ + { + name: "policy doesn't exist", + storage: &policyfakes.FakeStorage{PolicyStub: func(ctx context.Context, s1, s2, s3, s4 string) (*storage.Policy, error) { + return nil, errors.New(errors.NotFound, "not found") + }}, + request: &goapolicy.SubscribeRequest{ + WebhookURL: "http://some.url/example", + Subscriber: "Subscriber Name", + Repository: "policy repo", + PolicyName: "policy name", + Group: "policy group", + Version: "policy version", + }, + + errText: "not found", + }, + { + name: "subscriber already exist", + storage: &policyfakes.FakeStorage{SubscriberStub: func(ctx context.Context, s1, s2, s3, s4, s5, s6 string) (*storage.Subscriber, error) { + return &storage.Subscriber{Name: "subscriber"}, nil + }}, + request: &goapolicy.SubscribeRequest{ + WebhookURL: "http://some.url/example", + Subscriber: "Subscriber Name", + Repository: "policy repo", + PolicyName: "policy name", + Group: "policy group", + Version: "policy version", + }, + + errText: "subscriber already exist", + }, + { + name: "error retrieving subscriber", + storage: &policyfakes.FakeStorage{SubscriberStub: func(ctx context.Context, s1, s2, s3, s4, s5, s6 string) (*storage.Subscriber, error) { + return nil, errors.New("some error") + }}, + request: &goapolicy.SubscribeRequest{ + WebhookURL: "http://some.url/example", + Subscriber: "Subscriber Name", + Repository: "policy repo", + PolicyName: "policy name", + Group: "policy group", + Version: "policy version", + }, + + errText: "some error", + }, { name: "error while creating subscriber", storage: &policyfakes.FakeStorage{CreateSubscriberStub: func(ctx context.Context, s *storage.Subscriber) (*storage.Subscriber, error) { @@ -1419,7 +1467,7 @@ func TestService_PolicyPublicKey(t *testing.T) { }, }, errtext: "policy export configuration is not defined", - errkind: errors.Unknown, + errkind: errors.Forbidden, }, { name: "wrong format for policy export configuration", diff --git a/internal/service/policy/storage.go b/internal/service/policy/storage.go index ce98436dc390fff63b30b3822a32a7594e63e7e4..ca2f2df0c68243b7aa923812187af5be936fe9ca 100644 --- a/internal/service/policy/storage.go +++ b/internal/service/policy/storage.go @@ -11,8 +11,9 @@ type Storage interface { SavePolicy(ctx context.Context, policy *storage.Policy) error SetPolicyLock(ctx context.Context, repository, group, name, version string, lock bool) error GetPolicies(ctx context.Context, locked *bool) ([]*storage.Policy, error) - AddPolicyChangeSubscribers(subscribers ...storage.PolicyChangeSubscriber) + AddPolicySubscribers(subscribers ...storage.PolicySubscriber) ListenPolicyDataChanges(ctx context.Context) error + Subscriber(ctx context.Context, policyRepository, policyGroup, policyName, policyVersion, webhook, name string) (*storage.Subscriber, error) CreateSubscriber(ctx context.Context, subscriber *storage.Subscriber) (*storage.Subscriber, error) Close(ctx context.Context) GetData(ctx context.Context, key string) (any, error) diff --git a/internal/storage/memory/storage.go b/internal/storage/memory/storage.go index 8260c321bfec30f4c67d378b5dcf4efa8a89b876..67e1912ea904cd1f9e6db9e5b31834a90eee538a 100644 --- a/internal/storage/memory/storage.go +++ b/internal/storage/memory/storage.go @@ -20,12 +20,15 @@ type KeyConstructor interface { type Storage struct { keyConstructor KeyConstructor - subscribers []storage.PolicyChangeSubscriber + subscribers []storage.PolicySubscriber changes chan storage.Policy mu sync.RWMutex policies map[string]*storage.Policy + muSubscribers sync.RWMutex + policySubscribers map[string]*storage.Subscriber + muCommonStorage sync.RWMutex commonStorage map[string]interface{} @@ -39,11 +42,12 @@ func New(c KeyConstructor, p map[string]*storage.Policy, l *zap.Logger) *Storage ch := make(chan storage.Policy) return &Storage{ - keyConstructor: c, - changes: ch, - policies: p, - commonStorage: map[string]interface{}{}, - logger: l, + keyConstructor: c, + changes: ch, + policies: p, + policySubscribers: map[string]*storage.Subscriber{}, + commonStorage: map[string]interface{}{}, + logger: l, } } @@ -155,7 +159,7 @@ func (s *Storage) UpdateNextRefreshTime(_ context.Context, p *storage.Policy, ne return nil } -func (s *Storage) AddPolicyChangeSubscribers(subscribers ...storage.PolicyChangeSubscriber) { +func (s *Storage) AddPolicySubscribers(subscribers ...storage.PolicySubscriber) { s.subscribers = subscribers } @@ -219,8 +223,26 @@ func (s *Storage) DeleteData(_ context.Context, key string) error { func (s *Storage) Close(_ context.Context) {} -func (s *Storage) CreateSubscriber(_ context.Context, _ *storage.Subscriber) (*storage.Subscriber, error) { - return nil, errors.New(errors.Internal, "function CreateSubscriber is not implemented for memory storage") +func (s *Storage) CreateSubscriber(_ context.Context, sub *storage.Subscriber) (*storage.Subscriber, error) { + s.muSubscribers.RLock() + defer s.muSubscribers.RUnlock() + + s.policySubscribers[sub.PolicyRepository+sub.PolicyGroup+sub.PolicyName+sub.PolicyVersion+sub.WebhookURL+sub.Name] = sub + + res := *sub // don't return the Subscriber by reference + return &res, nil +} + +func (s *Storage) Subscriber(_ context.Context, policyRepository, policyGroup, policyName, policyVersion, webhookURL, name string) (*storage.Subscriber, error) { + s.muSubscribers.RLock() + defer s.muSubscribers.RUnlock() + subscriber, ok := s.policySubscribers[policyRepository+policyGroup+policyName+policyVersion+webhookURL+name] + if !ok { + return nil, errors.New(errors.NotFound, "subscriber not found in memory storage") + } + + res := *subscriber // don't return the Subscriber by reference + return &res, nil } func (s *Storage) SaveAutoImportConfig(_ context.Context, importConfig *storage.PolicyAutoImport) error { diff --git a/internal/storage/memory/storage_test.go b/internal/storage/memory/storage_test.go index 9f2fe031ca804cb29598b16923332195028f7d8b..5433a44f1b110d780afb0b9dc86d12205daff790 100644 --- a/internal/storage/memory/storage_test.go +++ b/internal/storage/memory/storage_test.go @@ -153,6 +153,49 @@ func TestStorage_CommonStorage(t *testing.T) { }) } +func TestStorage_PolicySubscriber(t *testing.T) { + s := memory.New(nil, nil, zap.NewNop()) + subscriber := &storage.Subscriber{ + Name: "name", + WebhookURL: "webhook", + PolicyRepository: "repo", + PolicyName: "policyname", + PolicyGroup: "policygroup", + PolicyVersion: "policyversion", + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, + } + t.Run("create policy subscriber", func(t *testing.T) { + sub, err := s.CreateSubscriber(context.Background(), subscriber) + assert.NoError(t, err) + assert.Equal(t, subscriber, subscriber, sub) + }) + + t.Run("get subscriber", func(t *testing.T) { + sub, err := s.Subscriber(context.Background(), + subscriber.PolicyRepository, + subscriber.PolicyGroup, + subscriber.PolicyName, + subscriber.PolicyVersion, + subscriber.WebhookURL, + subscriber.Name) + assert.NoError(t, err) + assert.Equal(t, subscriber, sub) + }) + + t.Run("get subscriber return error", func(t *testing.T) { + sub, err := s.Subscriber(context.Background(), + subscriber.PolicyRepository, + subscriber.PolicyGroup, + subscriber.PolicyName, + subscriber.PolicyVersion, + subscriber.WebhookURL, + "not existing name") + assert.ErrorContains(t, err, "subscriber not found in memory storage") + assert.Nil(t, sub) + }) +} + // makePolicies makes a valid policies map func makePolicies() map[string]*storage.Policy { return map[string]*storage.Policy{ diff --git a/internal/storage/mongodb/storage.go b/internal/storage/mongodb/storage.go index a7ed6ef0a665d53049d13edabc650dc87e2e8d36..e50da13fed97786cc8cbda8147bacf0b3f0fa8db 100644 --- a/internal/storage/mongodb/storage.go +++ b/internal/storage/mongodb/storage.go @@ -4,7 +4,6 @@ import ( "context" goerrors "errors" "fmt" - "strings" "time" "go.mongodb.org/mongo-driver/bson" @@ -32,7 +31,7 @@ type Storage struct { subscriber *mongo.Collection commonStorage *mongo.Collection autoImport *mongo.Collection - subscribers []storage.PolicyChangeSubscriber + subscribers []storage.PolicySubscriber logger *zap.Logger } @@ -62,7 +61,7 @@ func (s *Storage) Policy(ctx context.Context, repository, group, name, version s }) if result.Err() != nil { - if strings.Contains(result.Err().Error(), "no documents in result") { + if goerrors.Is(result.Err(), mongo.ErrNoDocuments) { return nil, errors.New(errors.NotFound, "policy not found") } return nil, result.Err() @@ -153,7 +152,7 @@ func (s *Storage) ListenPolicyDataChanges(ctx context.Context) error { return stream.Err() } -func (s *Storage) AddPolicyChangeSubscribers(subscribers ...storage.PolicyChangeSubscriber) { +func (s *Storage) AddPolicySubscribers(subscribers ...storage.PolicySubscriber) { s.subscribers = subscribers } @@ -262,24 +261,11 @@ func (s *Storage) Close(ctx context.Context) { } func (s *Storage) CreateSubscriber(ctx context.Context, subscriber *storage.Subscriber) (*storage.Subscriber, error) { - _, err := s.policyExist(ctx, subscriber.PolicyRepository, subscriber.PolicyName, subscriber.PolicyGroup, subscriber.PolicyVersion) - if err != nil { - return nil, err - } - - subscriberExist, err := s.subscriberExist(ctx, subscriber) - if err != nil { - return nil, err - } - - if subscriberExist { - return nil, fmt.Errorf("subscriber already exists") - } - subscriber.CreatedAt = time.Now() subscriber.UpdatedAt = time.Now() subscriber.MongoID = primitive.NewObjectID() - _, err = s.subscriber.InsertOne(ctx, subscriber) + + _, err := s.subscriber.InsertOne(ctx, subscriber) if err != nil { return nil, err } @@ -287,7 +273,7 @@ func (s *Storage) CreateSubscriber(ctx context.Context, subscriber *storage.Subs return subscriber, nil } -func (s *Storage) PolicyChangeSubscribers(ctx context.Context, policyRepository, policyName, policyGroup, policyVersion string) ([]*storage.Subscriber, error) { +func (s *Storage) PolicySubscribers(ctx context.Context, policyRepository, policyName, policyGroup, policyVersion string) ([]*storage.Subscriber, error) { cursor, err := s.subscriber.Find(ctx, bson.M{ "policyrepository": policyRepository, "policyname": policyName, @@ -306,6 +292,31 @@ func (s *Storage) PolicyChangeSubscribers(ctx context.Context, policyRepository, return subscribers, nil } +func (s *Storage) Subscriber(ctx context.Context, policyRepository, policyGroup, policyName, policyVersion, webhookURL, name string) (*storage.Subscriber, error) { + result := s.subscriber.FindOne(ctx, bson.M{ + "webhookurl": webhookURL, + "name": name, + "policyrepository": policyRepository, + "policygroup": policyGroup, + "policyname": policyName, + "policyversion": policyVersion, + }) + + if result.Err() != nil { + if goerrors.Is(result.Err(), mongo.ErrNoDocuments) { + return nil, errors.New(errors.NotFound, "subscriber not found") + } + return nil, result.Err() + } + + var subscriber storage.Subscriber + if err := result.Decode(&subscriber); err != nil { + return nil, err + } + + return &subscriber, nil +} + func (s *Storage) SetData(ctx context.Context, key string, data map[string]interface{}) error { opts := options.Update().SetUpsert(true) query := bson.M{"key": key} @@ -344,37 +355,6 @@ func (s *Storage) DeleteData(ctx context.Context, key string) error { return err } -func (s *Storage) subscriberExist(ctx context.Context, subscriber *storage.Subscriber) (bool, error) { - err := s.subscriber.FindOne(ctx, bson.M{ - "name": subscriber.Name, - "webhookurl": subscriber.WebhookURL, - "policyrepository": subscriber.PolicyRepository, - "policyname": subscriber.PolicyName, - "policygroup": subscriber.PolicyGroup, - "policyversion": subscriber.PolicyVersion, - }).Err() - if err != nil { - if goerrors.Is(err, mongo.ErrNoDocuments) { - return false, nil - } - return false, err - } - return true, nil -} - -func (s *Storage) policyExist(ctx context.Context, repository, name, group, version string) (bool, error) { - err := s.policy.FindOne(ctx, bson.M{ - "repository": repository, - "name": name, - "group": group, - "version": version, - }).Err() - if err != nil { - return false, err - } - return true, nil -} - func (s *Storage) SaveAutoImportConfig(ctx context.Context, importConfig *storage.PolicyAutoImport) error { opts := options.Update().SetUpsert(true) filter := bson.M{ diff --git a/internal/storage/types.go b/internal/storage/types.go index 2ebf7490000810ee70f687b766cee436b62895ac..eafb978234e022e0907966f20eb565f3a81e583d 100644 --- a/internal/storage/types.go +++ b/internal/storage/types.go @@ -9,7 +9,7 @@ import ( const RefreshPostponePeriod = 5 * time.Minute -type PolicyChangeSubscriber interface { +type PolicySubscriber interface { PolicyDataChange(ctx context.Context, repo, group, name, version string) error }