From 9542a97c39b3ea47c6c8bcc3e7833828b45c306c Mon Sep 17 00:00:00 2001 From: Yordan Kinkov <yordan.kinkov@vereign.com> Date: Tue, 15 Nov 2022 07:23:39 +0000 Subject: [PATCH] Load external JSON data snapshot from a URL for policy evaluation --- README.md | 29 +- cmd/policy/main.go | 12 + cmd/sync/main.go | 4 +- internal/config/config.go | 5 + .../service/policy/policydata/dataconfig.go | 38 +++ .../policydatafakes/fake_storage.go | 278 ++++++++++++++++++ .../service/policy/policydata/refresher.go | 152 ++++++++++ .../policy/policydata/refresher_test.go | 133 +++++++++ internal/storage/storage.go | 109 ++++++- .../zap/zaptest/observer/logged_entry.go | Bin 0 -> 1596 bytes .../zap/zaptest/observer/observer.go | Bin 0 -> 5725 bytes vendor/modules.txt | Bin 14313 -> 14346 bytes 12 files changed, 748 insertions(+), 12 deletions(-) create mode 100644 internal/service/policy/policydata/dataconfig.go create mode 100644 internal/service/policy/policydata/policydatafakes/fake_storage.go create mode 100644 internal/service/policy/policydata/refresher.go create mode 100644 internal/service/policy/policydata/refresher_test.go create mode 100644 vendor/go.uber.org/zap/zaptest/observer/logged_entry.go create mode 100644 vendor/go.uber.org/zap/zaptest/observer/observer.go diff --git a/README.md b/README.md index 78aa797f..d1717dd7 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,13 @@ the `group`, `policy name` and `version` are directories inside the Git repo and 2. In the same directory there could be a data file containing static JSON, which is automatically available for use during policy evaluation by using the `data` variable. The file *must* be named `data.json`. Example: `/gaiax/example/1.0/data.json` -3. The policy package name inside the policy source code file *must* exactly match +3. In the same directory there could be a configuration file containing information for getting static JSON +data from external URL. The file *must* be named `data-config.json`. +Example: `/gaiax/example/1.0/data-config.json` +> Note that there should only be one of the two files `data.json` or `data-config.json` in the same directory. +> If both files exist in the same directory tha data from the `data.json` file will be eventually overwritten by the data +> acquired using the configuration from the `data-config.json` file. +4. The policy package name inside the policy source code file *must* exactly match the `group` and `policy` (name) of the policy. *What does it mean?* @@ -164,7 +170,26 @@ Example: If the `/gaiax/example/1.0/data.json` file is: ``` one could access the data using `data.name` within the Rego source code. -The 3rd rule for package naming is needed so that a generic evaluation function +The 3rd rule for configuration file is to provide configurations for getting static JSON data from external URL. +The file must contain a URL, an HTTP method and a period, after which an HTTP request is made to get the latest data. +> The period must be added as duration e.g. `10h`, `1h30m` etc. + +The file MAY contain body for the request. +Example file contents: +```json +{ + "url": "http://example.com/data.json?page=3", + "method": "GET", + "period": "10h", + "body": { + "key": "value" + } +} +``` +This means that every 10 hours an HTTP request is going to be made on the given URL, with `GET` method and the result is going +to be stored as static data for this policy and passed during evaluation. + +The 4th rule for package naming is needed so that a generic evaluation function can be mapped and used for evaluating all kinds of different policies. Without a package naming rule, there's no way the service can automatically generate HTTP endpoints for working with arbitrary dynamically uploaded policies. diff --git a/cmd/policy/main.go b/cmd/policy/main.go index ff64fe26..0b26b2f9 100644 --- a/cmd/policy/main.go +++ b/cmd/policy/main.go @@ -37,6 +37,7 @@ import ( "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/policy/internal/service" "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/policy/internal/service/health" "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/policy/internal/service/policy" + "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/policy/internal/service/policy/policydata" "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/policy/internal/storage" ) @@ -93,6 +94,14 @@ func main() { // subscribe the cache for policy data changes storage.AddPolicyChangeSubscriber(regocache) + // create policy data refresher + dataRefresher := policydata.NewRefresher( + storage, + cfg.Refresher.PollInterval, + httpClient, + logger, + ) + // register rego extension functions { cacheFuncs := regofunc.NewCacheFuncs(cfg.Cache.Addr, oauthClient) @@ -203,6 +212,9 @@ func main() { } return nil }) + g.Go(func() error { + return dataRefresher.Start(ctx) + }) if err := g.Wait(); err != nil { logger.Error("run group stopped", zap.Error(err)) } diff --git a/cmd/sync/main.go b/cmd/sync/main.go index a14cc9b9..42ba5d48 100644 --- a/cmd/sync/main.go +++ b/cmd/sync/main.go @@ -296,7 +296,7 @@ func upsert(ctx context.Context, policies []*Policy, db *mongo.Collection) error "data": policy.Data, "dataConfig": policy.DataConfig, "lastUpdate": time.Now(), - "nextConfigExecution": nextConfigExecution(policy), + "nextDataRefreshTime": nextDataRefreshTime(policy), }, }) op.SetUpsert(true) @@ -311,7 +311,7 @@ func upsert(ctx context.Context, policies []*Policy, db *mongo.Collection) error return nil } -func nextConfigExecution(p *Policy) time.Time { +func nextDataRefreshTime(p *Policy) time.Time { if p.DataConfig != "" { return time.Now() } diff --git a/internal/config/config.go b/internal/config/config.go index 1aedbedf..c092a9fa 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,6 +12,7 @@ type Config struct { Metrics metricsConfig OCM ocmConfig OAuth oauthConfig + Refresher refresherConfig LogLevel string `envconfig:"LOG_LEVEL" default:"INFO"` } @@ -61,3 +62,7 @@ type oauthConfig struct { ClientSecret string `envconfig:"OAUTH_CLIENT_SECRET" required:"true"` TokenURL string `envconfig:"OAUTH_TOKEN_URL" required:"true"` } + +type refresherConfig struct { + PollInterval time.Duration `envconfig:"REFRESHER_POLL_INTERVAL" default:"10s"` +} diff --git a/internal/service/policy/policydata/dataconfig.go b/internal/service/policy/policydata/dataconfig.go new file mode 100644 index 00000000..fcffacac --- /dev/null +++ b/internal/service/policy/policydata/dataconfig.go @@ -0,0 +1,38 @@ +package policydata + +import ( + "encoding/json" + "time" + + "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/golib/errors" +) + +type DataConfig struct { + URL string + Method string + Period Duration + Body interface{} +} + +type Duration time.Duration + +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case float64: + *d = Duration(time.Duration(value)) + return nil + case string: + tmp, err := time.ParseDuration(value) + if err != nil { + return err + } + *d = Duration(tmp) + return nil + default: + return errors.New("invalid duration") + } +} diff --git a/internal/service/policy/policydata/policydatafakes/fake_storage.go b/internal/service/policy/policydata/policydatafakes/fake_storage.go new file mode 100644 index 00000000..521c346f --- /dev/null +++ b/internal/service/policy/policydata/policydatafakes/fake_storage.go @@ -0,0 +1,278 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package policydatafakes + +import ( + "context" + "sync" + "time" + + "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/policy/internal/service/policy/policydata" + "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/policy/internal/storage" +) + +type FakeStorage struct { + GetRefreshPoliciesStub func(context.Context) ([]*storage.Policy, error) + getRefreshPoliciesMutex sync.RWMutex + getRefreshPoliciesArgsForCall []struct { + arg1 context.Context + } + getRefreshPoliciesReturns struct { + result1 []*storage.Policy + result2 error + } + getRefreshPoliciesReturnsOnCall map[int]struct { + result1 []*storage.Policy + result2 error + } + PostponeRefreshStub func(context.Context, []*storage.Policy) error + postponeRefreshMutex sync.RWMutex + postponeRefreshArgsForCall []struct { + arg1 context.Context + arg2 []*storage.Policy + } + postponeRefreshReturns struct { + result1 error + } + postponeRefreshReturnsOnCall map[int]struct { + result1 error + } + UpdateNextRefreshTimeStub func(context.Context, *storage.Policy, time.Time) error + updateNextRefreshTimeMutex sync.RWMutex + updateNextRefreshTimeArgsForCall []struct { + arg1 context.Context + arg2 *storage.Policy + arg3 time.Time + } + updateNextRefreshTimeReturns struct { + result1 error + } + updateNextRefreshTimeReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeStorage) GetRefreshPolicies(arg1 context.Context) ([]*storage.Policy, error) { + fake.getRefreshPoliciesMutex.Lock() + ret, specificReturn := fake.getRefreshPoliciesReturnsOnCall[len(fake.getRefreshPoliciesArgsForCall)] + fake.getRefreshPoliciesArgsForCall = append(fake.getRefreshPoliciesArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.GetRefreshPoliciesStub + fakeReturns := fake.getRefreshPoliciesReturns + fake.recordInvocation("GetRefreshPolicies", []interface{}{arg1}) + fake.getRefreshPoliciesMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeStorage) GetRefreshPoliciesCallCount() int { + fake.getRefreshPoliciesMutex.RLock() + defer fake.getRefreshPoliciesMutex.RUnlock() + return len(fake.getRefreshPoliciesArgsForCall) +} + +func (fake *FakeStorage) GetRefreshPoliciesCalls(stub func(context.Context) ([]*storage.Policy, error)) { + fake.getRefreshPoliciesMutex.Lock() + defer fake.getRefreshPoliciesMutex.Unlock() + fake.GetRefreshPoliciesStub = stub +} + +func (fake *FakeStorage) GetRefreshPoliciesArgsForCall(i int) context.Context { + fake.getRefreshPoliciesMutex.RLock() + defer fake.getRefreshPoliciesMutex.RUnlock() + argsForCall := fake.getRefreshPoliciesArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeStorage) GetRefreshPoliciesReturns(result1 []*storage.Policy, result2 error) { + fake.getRefreshPoliciesMutex.Lock() + defer fake.getRefreshPoliciesMutex.Unlock() + fake.GetRefreshPoliciesStub = nil + fake.getRefreshPoliciesReturns = struct { + result1 []*storage.Policy + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) GetRefreshPoliciesReturnsOnCall(i int, result1 []*storage.Policy, result2 error) { + fake.getRefreshPoliciesMutex.Lock() + defer fake.getRefreshPoliciesMutex.Unlock() + fake.GetRefreshPoliciesStub = nil + if fake.getRefreshPoliciesReturnsOnCall == nil { + fake.getRefreshPoliciesReturnsOnCall = make(map[int]struct { + result1 []*storage.Policy + result2 error + }) + } + fake.getRefreshPoliciesReturnsOnCall[i] = struct { + result1 []*storage.Policy + result2 error + }{result1, result2} +} + +func (fake *FakeStorage) PostponeRefresh(arg1 context.Context, arg2 []*storage.Policy) error { + var arg2Copy []*storage.Policy + if arg2 != nil { + arg2Copy = make([]*storage.Policy, len(arg2)) + copy(arg2Copy, arg2) + } + fake.postponeRefreshMutex.Lock() + ret, specificReturn := fake.postponeRefreshReturnsOnCall[len(fake.postponeRefreshArgsForCall)] + fake.postponeRefreshArgsForCall = append(fake.postponeRefreshArgsForCall, struct { + arg1 context.Context + arg2 []*storage.Policy + }{arg1, arg2Copy}) + stub := fake.PostponeRefreshStub + fakeReturns := fake.postponeRefreshReturns + fake.recordInvocation("PostponeRefresh", []interface{}{arg1, arg2Copy}) + fake.postponeRefreshMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeStorage) PostponeRefreshCallCount() int { + fake.postponeRefreshMutex.RLock() + defer fake.postponeRefreshMutex.RUnlock() + return len(fake.postponeRefreshArgsForCall) +} + +func (fake *FakeStorage) PostponeRefreshCalls(stub func(context.Context, []*storage.Policy) error) { + fake.postponeRefreshMutex.Lock() + defer fake.postponeRefreshMutex.Unlock() + fake.PostponeRefreshStub = stub +} + +func (fake *FakeStorage) PostponeRefreshArgsForCall(i int) (context.Context, []*storage.Policy) { + fake.postponeRefreshMutex.RLock() + defer fake.postponeRefreshMutex.RUnlock() + argsForCall := fake.postponeRefreshArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeStorage) PostponeRefreshReturns(result1 error) { + fake.postponeRefreshMutex.Lock() + defer fake.postponeRefreshMutex.Unlock() + fake.PostponeRefreshStub = nil + fake.postponeRefreshReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeStorage) PostponeRefreshReturnsOnCall(i int, result1 error) { + fake.postponeRefreshMutex.Lock() + defer fake.postponeRefreshMutex.Unlock() + fake.PostponeRefreshStub = nil + if fake.postponeRefreshReturnsOnCall == nil { + fake.postponeRefreshReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.postponeRefreshReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeStorage) UpdateNextRefreshTime(arg1 context.Context, arg2 *storage.Policy, arg3 time.Time) error { + fake.updateNextRefreshTimeMutex.Lock() + ret, specificReturn := fake.updateNextRefreshTimeReturnsOnCall[len(fake.updateNextRefreshTimeArgsForCall)] + fake.updateNextRefreshTimeArgsForCall = append(fake.updateNextRefreshTimeArgsForCall, struct { + arg1 context.Context + arg2 *storage.Policy + arg3 time.Time + }{arg1, arg2, arg3}) + stub := fake.UpdateNextRefreshTimeStub + fakeReturns := fake.updateNextRefreshTimeReturns + fake.recordInvocation("UpdateNextRefreshTime", []interface{}{arg1, arg2, arg3}) + fake.updateNextRefreshTimeMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeStorage) UpdateNextRefreshTimeCallCount() int { + fake.updateNextRefreshTimeMutex.RLock() + defer fake.updateNextRefreshTimeMutex.RUnlock() + return len(fake.updateNextRefreshTimeArgsForCall) +} + +func (fake *FakeStorage) UpdateNextRefreshTimeCalls(stub func(context.Context, *storage.Policy, time.Time) error) { + fake.updateNextRefreshTimeMutex.Lock() + defer fake.updateNextRefreshTimeMutex.Unlock() + fake.UpdateNextRefreshTimeStub = stub +} + +func (fake *FakeStorage) UpdateNextRefreshTimeArgsForCall(i int) (context.Context, *storage.Policy, time.Time) { + fake.updateNextRefreshTimeMutex.RLock() + defer fake.updateNextRefreshTimeMutex.RUnlock() + argsForCall := fake.updateNextRefreshTimeArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeStorage) UpdateNextRefreshTimeReturns(result1 error) { + fake.updateNextRefreshTimeMutex.Lock() + defer fake.updateNextRefreshTimeMutex.Unlock() + fake.UpdateNextRefreshTimeStub = nil + fake.updateNextRefreshTimeReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeStorage) UpdateNextRefreshTimeReturnsOnCall(i int, result1 error) { + fake.updateNextRefreshTimeMutex.Lock() + defer fake.updateNextRefreshTimeMutex.Unlock() + fake.UpdateNextRefreshTimeStub = nil + if fake.updateNextRefreshTimeReturnsOnCall == nil { + fake.updateNextRefreshTimeReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.updateNextRefreshTimeReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeStorage) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.getRefreshPoliciesMutex.RLock() + defer fake.getRefreshPoliciesMutex.RUnlock() + fake.postponeRefreshMutex.RLock() + defer fake.postponeRefreshMutex.RUnlock() + fake.updateNextRefreshTimeMutex.RLock() + defer fake.updateNextRefreshTimeMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeStorage) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ policydata.Storage = new(FakeStorage) diff --git a/internal/service/policy/policydata/refresher.go b/internal/service/policy/policydata/refresher.go new file mode 100644 index 00000000..c5a7127c --- /dev/null +++ b/internal/service/policy/policydata/refresher.go @@ -0,0 +1,152 @@ +package policydata + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/url" + "time" + + "go.uber.org/zap" + + "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/golib/errors" + "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/policy/internal/storage" +) + +//go:generate counterfeiter . Storage + +type Storage interface { + GetRefreshPolicies(ctx context.Context) ([]*storage.Policy, error) + PostponeRefresh(ctx context.Context, policies []*storage.Policy) error + UpdateNextRefreshTime(ctx context.Context, p *storage.Policy, nextDataRefreshTime time.Time) error +} + +type Refresher struct { + storage Storage + pollInterval time.Duration + + httpClient *http.Client + logger *zap.Logger +} + +func NewRefresher( + storage Storage, + pollInterval time.Duration, + httpClient *http.Client, + logger *zap.Logger, +) *Refresher { + return &Refresher{ + storage: storage, + pollInterval: pollInterval, + httpClient: httpClient, + logger: logger, + } +} + +func (e *Refresher) Start(ctx context.Context) error { + defer e.logger.Info("policy data refresher stopped") + +loop: + for { + select { + case <-ctx.Done(): + break loop + case <-time.After(e.pollInterval): + policies, err := e.storage.GetRefreshPolicies(ctx) + if err != nil { + if !errors.Is(errors.NotFound, err) { + e.logger.Error("error getting policies for data refresh from storage", zap.Error(err)) + } + continue + } + for _, policy := range policies { + e.Execute(ctx, policy) + } + } + } + + return ctx.Err() +} + +func (e *Refresher) Execute(ctx context.Context, p *storage.Policy) { + logger := e.logger.With( + zap.String("policyName", p.Name), + zap.String("policyGroup", p.Group), + zap.String("policyVersion", p.Version), + ) + + var config DataConfig + if err := json.Unmarshal([]byte(p.DataConfig), &config); err != nil { + // data configuration is corrupted, set next refresh time to Go's zero date + _ = e.storage.UpdateNextRefreshTime(ctx, p, time.Time{}) + logger.Error("error unmarshalling data configuration", zap.Error(err)) + return + } + if config.URL == "" || config.Period == Duration(0) || config.Method == "" { + // data configuration is missing required fields, set next refresh time to Go's zero date + _ = e.storage.UpdateNextRefreshTime(ctx, p, time.Time{}) + logger.Error("required fields are missing in data configuration") + return + } + + req, err := e.createHTTPRequest(ctx, &config) + if err != nil { + // cannot create a request, set next refresh time to Go's zero date + _ = e.storage.UpdateNextRefreshTime(ctx, p, time.Time{}) + logger.Error("error creating an http request", zap.Error(err)) + return + } + + resp, err := e.httpClient.Do(req) + if err != nil { + // making data configuration request failed, set next refresh time to current time added data config's period + _ = e.storage.UpdateNextRefreshTime(ctx, p, time.Now().Add(time.Duration(config.Period))) + logger.Error("error making a data refresh request", zap.Error(err)) + return + } + defer resp.Body.Close() // nolint:errcheck + + if resp.StatusCode != http.StatusOK { + // unexpected response on data refresh request, set next refresh time to current time added data config's period + _ = e.storage.UpdateNextRefreshTime(ctx, p, time.Now().Add(time.Duration(config.Period))) + logger.Error("unexpected response on data refresh request", zap.Int("response code", resp.StatusCode)) + return + } + + dataBytes, err := io.ReadAll(resp.Body) + if err != nil { + // error reading response from data refresh request, set next refresh time to current time added data config's period + _ = e.storage.UpdateNextRefreshTime(ctx, p, time.Now().Add(time.Duration(config.Period))) + logger.Error("error reading response from data refresh request", zap.Error(err)) + return + } + + p.Data = string(dataBytes) + if err = e.storage.UpdateNextRefreshTime(ctx, p, time.Now().Add(time.Duration(config.Period))); err != nil { + logger.Error("error updating data after successful refresh request", zap.Error(err)) + return + } + logger.Debug("data refresh is successfully executed") +} + +func (e *Refresher) createHTTPRequest(ctx context.Context, config *DataConfig) (*http.Request, error) { + bodyBytes, err := json.Marshal(config.Body) + if err != nil { + return nil, errors.New("error marshaling data configuration body") + } + + url, err := url.Parse(config.URL) + if err != nil { + return nil, errors.New("invalid data configuration url") + } + if url.Scheme == "" { + url.Scheme = "https" + } + + if config.Method == http.MethodPost { + return http.NewRequestWithContext(ctx, config.Method, url.String(), bytes.NewReader(bodyBytes)) + } + return http.NewRequestWithContext(ctx, config.Method, url.String(), nil) +} diff --git a/internal/service/policy/policydata/refresher_test.go b/internal/service/policy/policydata/refresher_test.go new file mode 100644 index 00000000..59492f0c --- /dev/null +++ b/internal/service/policy/policydata/refresher_test.go @@ -0,0 +1,133 @@ +package policydata_test + +import ( + "context" + "errors" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + + "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/policy/internal/service/policy/policydata" + "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/policy/internal/service/policy/policydata/policydatafakes" + "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/policy/internal/storage" +) + +type RoundTripFunc func(req *http.Request) *http.Response + +func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req), nil +} + +// NewTestClient returns *http.Client with Transport replaced to avoid making real calls +func NewTestClient(fn RoundTripFunc) *http.Client { + return &http.Client{ + Transport: fn, + } +} + +func Test_Execute(t *testing.T) { + tests := []struct { + // test input + name string + statusCode int + policy storage.Policy + storage policydata.Storage + // expected result + logCnt int + firstLog string + }{ + { + name: "invalid data configuration", + policy: storage.Policy{DataConfig: "<invalid data configuration>"}, + storage: &policydatafakes.FakeStorage{ + UpdateNextRefreshTimeStub: func(ctx context.Context, policy *storage.Policy, t time.Time) error { + return nil + }, + }, + logCnt: 1, + firstLog: "error unmarshalling data configuration", + }, + { + name: "data configuration is missing required fields", + policy: storage.Policy{DataConfig: `{"url": "https://example.com"}`}, + storage: &policydatafakes.FakeStorage{ + UpdateNextRefreshTimeStub: func(ctx context.Context, policy *storage.Policy, t time.Time) error { + return nil + }, + }, + logCnt: 1, + firstLog: "required fields are missing in data configuration", + }, + { + name: "error making an http request", + policy: storage.Policy{DataConfig: `{"url": "htt//example.com", "method": "GET", "period": "1h"}`}, + storage: &policydatafakes.FakeStorage{ + UpdateNextRefreshTimeStub: func(ctx context.Context, policy *storage.Policy, t time.Time) error { + return nil + }, + }, + logCnt: 1, + firstLog: "error making a data refresh request", + }, + { + name: "unexpected response code", + statusCode: 500, + policy: storage.Policy{DataConfig: `{"url": "https://example.com", "method": "GET", "period": "1h"}`}, + storage: &policydatafakes.FakeStorage{ + UpdateNextRefreshTimeStub: func(ctx context.Context, policy *storage.Policy, t time.Time) error { + return nil + }, + }, + logCnt: 1, + firstLog: "unexpected response on data refresh request", + }, + { + name: "error updating data after successful refresh request", + policy: storage.Policy{DataConfig: `{"url": "https://example.com", "method": "GET", "period": "1h"}`}, + storage: &policydatafakes.FakeStorage{ + UpdateNextRefreshTimeStub: func(ctx context.Context, policy *storage.Policy, t time.Time) error { + return errors.New("storage error") + }, + }, + logCnt: 1, + firstLog: "error updating data after successful refresh request", + }, + { + name: "data refresh is successfully executed", + policy: storage.Policy{DataConfig: `{"url": "https://example.com", "method": "GET", "period": "1h"}`}, + storage: &policydatafakes.FakeStorage{ + UpdateNextRefreshTimeStub: func(ctx context.Context, policy *storage.Policy, t time.Time) error { + return nil + }, + }, + logCnt: 0, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + observedZapCore, observedLogs := observer.New(zap.ErrorLevel) + logger := zap.New(observedZapCore) + httpClient := http.DefaultClient + if test.statusCode != 0 { + httpClient = NewTestClient(func(req *http.Request) *http.Response { + return &http.Response{ + StatusCode: test.statusCode, + } + }) + } + refresher := policydata.NewRefresher(test.storage, time.Duration(0), httpClient, logger) + refresher.Execute(context.Background(), &test.policy) + + assert.Equal(t, test.logCnt, observedLogs.Len()) + if observedLogs.Len() > 0 { + firstLog := observedLogs.All()[0] + assert.Equal(t, test.firstLog, firstLog.Message) + } + }) + } +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index fed1d6a1..2901bfe0 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -6,28 +6,39 @@ import ( "time" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" zap "go.uber.org/zap" "gitlab.com/gaia-x/data-infrastructure-federation-services/tsa/golib/errors" ) +const ( + dataField = "data" + nextDataRefreshTimeField = "nextDataRefreshTime" + refreshPostponePeriod = 5 * time.Minute +) + type PolicyChangeSubscriber interface { PolicyDataChange() } type Policy struct { - Filename string - Name string - Group string - Version string - Rego string - Data string - Locked bool - LastUpdate time.Time + ID primitive.ObjectID `bson:"_id"` + Filename string + Name string + Group string + Version string + Rego string + Data string + DataConfig string + Locked bool + LastUpdate time.Time + NextDataRefreshTime time.Time } type Storage struct { + db *mongo.Client policy *mongo.Collection subscriber PolicyChangeSubscriber logger *zap.Logger @@ -39,6 +50,7 @@ func New(db *mongo.Client, dbname, collection string, logger *zap.Logger) (*Stor } return &Storage{ + db: db, policy: db.Database(dbname).Collection(collection), logger: logger, }, nil @@ -109,3 +121,84 @@ func (s *Storage) ListenPolicyDataChanges(ctx context.Context) error { func (s *Storage) AddPolicyChangeSubscriber(subscriber PolicyChangeSubscriber) { s.subscriber = subscriber } + +func (s *Storage) GetRefreshPolicies(ctx context.Context) ([]*Policy, error) { + // create a callback for the mongodb transaction + callback := func(mCtx mongo.SessionContext) (interface{}, error) { + filter := bson.M{nextDataRefreshTimeField: bson.M{ + "$gt": time.Time{}, // greater than the Go's zero date + "$lte": time.Now(), + }} + + cursor, err := s.policy.Find(ctx, filter) + if err != nil { + return nil, err + } + + var policies []*Policy + if err := cursor.All(ctx, &policies); err != nil { + return nil, err + } + if len(policies) == 0 { + return nil, errors.New(errors.NotFound, "policies for data refresh not found") + } + + err = s.PostponeRefresh(ctx, policies) + if err != nil { + return nil, err + } + + return policies, nil + } + + // execute transaction + res, err := s.Transaction(ctx, callback) + if err != nil { + return nil, err + } + policies, _ := res.([]*Policy) + + return policies, nil +} + +// PostponeRefresh adds a refreshPostponePeriod Duration to each policy's +// nextDataRefreshTimeField in order to prevent concurrent data refresh +func (s *Storage) PostponeRefresh(ctx context.Context, policies []*Policy) error { + var ids []*primitive.ObjectID + for _, p := range policies { + ids = append(ids, &p.ID) + } + + filter := bson.M{"_id": bson.M{"$in": ids}} + update := bson.M{"$set": bson.M{nextDataRefreshTimeField: time.Now().Add(refreshPostponePeriod)}} + _, err := s.policy.UpdateMany(ctx, filter, update) + + return err +} + +// UpdateNextRefreshTime updates policy's data and nextDataRefreshTimeField fields +func (s *Storage) UpdateNextRefreshTime(ctx context.Context, p *Policy, nextDataRefreshTime time.Time) error { + filter := bson.M{"_id": p.ID} + update := bson.M{"$set": bson.M{ + nextDataRefreshTimeField: nextDataRefreshTime, + dataField: p.Data, + }} + _, err := s.policy.UpdateOne(ctx, filter, update) + + return err +} + +func (s *Storage) Transaction(ctx context.Context, callback func(mCtx mongo.SessionContext) (interface{}, error)) (interface{}, error) { + session, err := s.db.StartSession() + if err != nil { + return nil, errors.New("failed creating session", err) + } + defer session.EndSession(ctx) + + res, err := session.WithTransaction(ctx, callback) + if err != nil { + return nil, errors.New("failed executing transaction", err) + } + + return res, nil +} diff --git a/vendor/go.uber.org/zap/zaptest/observer/logged_entry.go b/vendor/go.uber.org/zap/zaptest/observer/logged_entry.go new file mode 100644 index 0000000000000000000000000000000000000000..a4ea7ec36c1e4162f9a56e17183d78be218c4147 GIT binary patch literal 1596 zcmZ8hO>g5i5Isk~V&IblsCL^+fdzU{Y&j+>ONK0O8Y~tCB~db4nKVRd({z#l-Z!$G zA_Zg^Bxl~okTc6At?cEs*X_yEyt=2KfBfkeI+m)ZLRBYYJKJjILK3E01j{8p+N=Jo zozvEkc63s`DqpGXrSYl`sp*v>+fa3qeXBz9mZW*5i|QQ`Z0V&o+O#C80x{zu*q?yp zY~x>KuMl68bk0^<0+MQ54QFM%^i0|4PC1(UlcL#S%9!1cJhhS?18M_6MYmIW(f(uy zPrY)!*A>Hr2&%fFW+k`dPM`He!s%m89OJ>l;9!LXhjg~JZunn~{w_n=X?F^#){I^b z9+8eWD`hwfZkDzur@B!bKv2-D>Hjp2VN8TNLB&)q$2%`4d;aK0kE)uXHy}oV5L8=~ zZ{+<`Ri1a)LSwtmzOcv2np(4Q_X&>JRDp9@+HY#Kb$tNFdZ-@jKi<eF=6ci9kvqw* zqf$-f1GzSg#=AFX@H&*v<2`8E(Z%*7@Bba&UQ@A+>5y%Trznp}9B7|spT#;}(=0k* zJqzhc6x-}r5F+v@EuJacP?SE?Zz5fX6#u@@<HLcnoOOuZJ`pjtMY>9kYmq+E15(qh zphWCM0o)=Z=A6((jMNS7;(WCQc=R9=Q9Or?wGl<i$eS#ui1txlh}AKPa@rsBeRhZ; zVh!X}q?;T>@h(n_1&Fan@n<aPu#J+0*%&T5!bZ+QX_f7tbMd$>XqzSL7@H3<^hFOz zd}V{XRT7C^Nb6`9J+j7}G60PxIr1u#p0+XXGHZlyRfsHQKdUS)ax6pG&5QT!Cvk{F zigIz_%53s%H#+6&A_*M8L24Rbfw=NxvEDBTAzZv8+}{+@I*t;+;kHut$>+EDMG#zM z^_Ofh5=y7~Z>kRhedaeov(_#K44Q@Q+vPv<!oSM)Y8H&uGL+c1RdsB9|H^|#8d9c0 z^O^Ps*&6HcR?*TIboKjiz$lS~k%i`T4y^phXron!f$~l2PL{gUeuP6uc=flZT3wW> z@mLpuf4!&=tZ3)~-Sj=Uo3t%PHNo9#O8)Zob}*6zZ=?3<bSE#+=h3DRPiL$e%mMC{ zh7~!UQ&Z?7Xa-Z!`~&^{?G!-sdj|Nz6UMY(HKg~Atkm~67B`j!6Z_Mzbd!^+7i486 zPlK4NruzAPaK{4vz>MZl3~XGBTHL$<=&o5rbzRu`gouo{;O^?-T6|pB=+wdLzg1cR A!T<mO literal 0 HcmV?d00001 diff --git a/vendor/go.uber.org/zap/zaptest/observer/observer.go b/vendor/go.uber.org/zap/zaptest/observer/observer.go new file mode 100644 index 0000000000000000000000000000000000000000..f77f1308baf29fa3699df5d3cb0eb07763f50d9b GIT binary patch literal 5725 zcmb_gO>^6}7F|of;*J-6RI1Qv7H{DhFBqAQIh7?3Mb|Hr$z&)Jl2B8GN|1KcO#XY% z1wegSuG79`lqdu*F3!2%(CH}+)M{PH`JyH->(Sdczx?<0+c$6D((P1Kl!$CmszS}B z(0z)^%<rC_;zKv0T1u^@DoJWuh)PV?G_TUK7I~j$l@O$6lr7R~F8Wj}O3O8^M5VDo zP3u&avYb;&8BS+?5Uv-{q}8l`Oe+EToKme-CR3=STxHEtlyzG3kuzBcO<uhabTV-} zPI~5?T%-jnlqD1jYTweMtQV@OsS>)bWX5Xx5Xy=sXC&?GLN28{gtr^RXx0Y`jRqA4 z+^40=<&3|D;kRn0h182a<&w2en;Mdu7c)`vF0eUO6=_iza8Lq+TGCsLMvM=Foq)ob zOY_R(LM`|CNP}wDR3#3EfgqGCn9rR1yU1!@VG1)<6zY*VW~$636W3>8#H13uPp9fZ z7+N+8N>u~3vA^+%0b@PwUg<?z6f_mid^lW|tc_P&X5iXPb&YtCX+bMhnR5>rx6zbb zg)|voCU-#`QZ%8Pc>G&*5nj+qFv0UlpYEdMYJ8gzB;sI{e4z0q1)~r2Yc#s(Q~1A| zIGjvq95asS`eqn~SdK=6;q65<dQb1Lbu><B7+psR^d@8CGo7|5#MVo?4&%WUln3vk zVU&F6v#!f18L{TeaZCZ-1aT4#ZihikH@ET4coG7{1vHPM(PfN-!s~F9_&6AA6#j+> znp_3LA)m(Tf?Lpt87K|LHy`5Y{Z&F&<Kab!#djg_1@DHTJq_#z!yvlu(?xI{yl0Ft zjiJbpWJwE??yf>!<x>NE21zs?F`vPBl*D-KgKnH`cHc#luunl8P1rJ*@%Y*>W$R)S z&VU|l9fejATizIJ;~<Ez@ovGsMMM{2FoZgIYs5VHeJk&GdF3X}?$bG{!c>du0i|VC zsRx-O3sU+sU1h2g{s8|VX49JPg;?PYic9%=DVC~Q8;qjNka*?%bviGVMw}v@SIFp1 zVL%x{u_8QINH92}{*UIooizm#p<*EEYK62kbp)x!hdm-nIa`_%NmrvjaNfwuXhO+Y zp=z|{p<Z($nb)j~g-w2@v`P`;SyQAHxjijdzq>l*j;(UZNkJ!b<u|CbzN+S@umk^P zt*5P>PP$zu>UBFOTwTh!KH>ka%Z$HkxfBrUJkcaki>op%Sm?+pZ1IzB&p2mnostjL zTys6=x?E;WRe{6$wN7U!VXDee<#W6#3X~Tv!?3o4nX|F9->ugxL3`SOxIr=c+U+bG zB4X5je0Pls`K8<8P4w{-j-87<gwxjDZwAm%lyGg0>c&12R5nX)M>tT{VhN14x^J=g z-C0v+<SBZ!dwh?e$?HMhKv4d&@#CS&?!8{OQ_|Tv6?m_F27(p){<bWfgoQw*LlvOF zwWw_CVnkf_7~_E{p1}hhz_Z7hjkAvohcLEfdM~^^(f%%80OoGwK^pFPfdhmbiv(K# z0Tb7iDwr3Y*t(;o$~mC9$vta+ci6L(TVFWFmIa%`gvL3QvT(+E;*~8Uu(?H}St2^I zXoBVi_w)%pn$Y;F#|jZ;STUKF7_m0AuO+Gw=e~LN6D*>h{3$9$4{6Z|9~~6wf{&{) zj|&7Y+=e7IwML`Z+0Mri%hlppOKnLUGuFmFCVsjd6f=?>BN(|*RrxidsDtm1@{<8d z=UdQUzjZtKSmUs;3x!KrphgVEgDAo;Dbf^!6&7?`K<x~nwBYnciV~`&V8LQ-LVhkE zz_;N2&+<Av!!zQWnr{y8?Po1haafga_j{a9o}isl(W9wSMN1oAC*d1ybk1oFOd#Fc zV|p#L<|gzkP16q+DF!0WXuT4doJsITF!06~NILc2QqRd&>K{g|MGk7v{v5GMDOW3m z!gugXTTWc++j)GSw6@y5kF-15bDMrFf7kXfAQ-6H6TUMD#$AgN#qhw{mr@k@_n2az zHT+%VFK{-hJtMgZWV81F`kT}GGcFnR7o%>Q;5;0mpMSAw-jTRlIT=6vqe%;G(ez9@ zm^#1-f!-@n6*%|JUzb!f$YcGOy?zzze{QmSvHl)|LAM9d9~2t@##GD>8N?vCTK+pb z9qBRVqO?1A>?V~eO||4I>gG~z4GjNrAv3fVR2nx7BV1AD%xq;{g~kLIzF=G!RjkZT z?@4()5|I0&MlNPNSee0S(_*`w2h6dyilF-KjUcx7AV`_adK;mxWwTuooeaq{5-ub+ z%FcNL7iErs<zB--7g#wPquVz1)2>0k@;q?1f%J{b`ZE(&n&%#*_ge*E8IJ9|u^BKN z82gOzO3m9jaAOTSl8(e9WtA{9P)a49w%3(uGlLbi8G?B>wO1DM+G_*<_|aYmdVJ}% z7nALs0TJRRh=Vx+r^1ZsbW>(bhYPV%h^j~4?$xQk-$jk#(I7D-?3;4O#9aGPRGg(N zcHZs*XF#1!`&hN+#{h5sr~+%>Fmx8|2Q%~^XyPPybAJnQSFs!Yp=@tl=YiH%xsRXP zZKe?NF&vV(1AX3@!r0#-;;(b^ULC;bc^Dw!(9e7WFt;H)BJgd)nDB2l;c8?fEW@SZ zP%rKk7%(uYwJ^=|&fv1QKAw4*vM0B$+XnDa^@D}LB?XDGIUwY8Lu(N9s?}|T+{For zer%Z$akEgqJve8b7Ku$v*Ag(o2YJpGWdZ(~iyP`@49mm|c<(SI+ZMzZx)zD+iS46t z7y!95e>yPg)95=e+S5RU07oXjaLUeXK)AHVd$<1ajBk3|=icG-C;V*5tzY}T-{bIp zi7{Usj(Nyxd1?hwRjBzOnc~8@z1_9OF}CW{8+*mGN_syZfQD0e_ojy8fqm7IWa!5v sw!}EYjLCd1lf&)B`>Sy~^kFJ!9mJQfO`Sa&rW0J}kpvrR&a-yoUybW*-~a#s literal 0 HcmV?d00001 diff --git a/vendor/modules.txt b/vendor/modules.txt index d89f7493d4e8b469894bad11de6f6e18926e9768..3ba5a11ff2e5cdc00a682f5607bfc9e33d09ba15 100644 GIT binary patch delta 30 lcmaEv-&L?dOLMZ2rZ8_wYH^8veo}F2QCVuyW=+ild;qr~3vK`a delta 12 TcmeAwc$vRJOLMcg=6*f^CZh#P -- GitLab