From b0a105bef4a1bc95dc7129df556090b9ca5a52f6 Mon Sep 17 00:00:00 2001 From: Yordan Kinkov <yordan.kinkov@vereign.com> Date: Thu, 9 Jun 2022 10:30:11 +0300 Subject: [PATCH] #13 rename struct fields --- internal/listexecutor/listexecutor.go | 8 +++++--- internal/service/tasklist/service.go | 2 +- internal/service/tasklist/task_list.go | 6 +++--- internal/storage/storage.go | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/internal/listexecutor/listexecutor.go b/internal/listexecutor/listexecutor.go index 09f5de6..530e0ee 100644 --- a/internal/listexecutor/listexecutor.go +++ b/internal/listexecutor/listexecutor.go @@ -90,15 +90,17 @@ loop: case <-ctx.Done(): break loop case <-time.After(l.pollInterval): + sem <- token{} // acquire a semaphore + taskList, err := l.queue.PollList(ctx) if err != nil { if !errors.Is(errors.NotFound, err) { l.logger.Error("error getting taskList from queue", zap.Error(err)) } + <-sem // release the semaphore continue } - sem <- token{} // acquire a semaphore go func(list *tasklist.TaskList) { l.Execute(ctx, list) <-sem // release the semaphore @@ -120,7 +122,7 @@ func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) { zap.String("taskListName", list.Name), ) list.State = taskpkg.Pending - list.StartTime = time.Now() + list.StartedAt = time.Now() // execute groups sequentially for i := range list.Groups { @@ -134,7 +136,7 @@ func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) { if list.State != taskpkg.Failed { list.State = taskpkg.Done } - list.FinishTime = time.Now() + list.FinishedAt = time.Now() if err := l.storage.SaveTaskListHistory(ctx, list); err != nil { logger.Error("error saving taskList history", zap.Error(err)) diff --git a/internal/service/tasklist/service.go b/internal/service/tasklist/service.go index bc996f5..5542a7d 100644 --- a/internal/service/tasklist/service.go +++ b/internal/service/tasklist/service.go @@ -78,7 +78,7 @@ func (s *Service) Create(ctx context.Context, req *goatasklist.CreateTaskListReq CacheScope: template.CacheScope, CacheNamespace: template.CacheNamespace, State: task.Created, - CreateTime: time.Now(), + CreatedAt: time.Now(), } // if cache namespace and scope are given, use them instead of the defaults diff --git a/internal/service/tasklist/task_list.go b/internal/service/tasklist/task_list.go index 065e720..0c9827e 100644 --- a/internal/service/tasklist/task_list.go +++ b/internal/service/tasklist/task_list.go @@ -27,9 +27,9 @@ type TaskList struct { Request []byte `json:"request"` CacheNamespace string `json:"cacheNamespace"` CacheScope string `json:"cacheScope"` - CreateTime time.Time `json:"createdAt"` - StartTime time.Time `json:"startedAt"` - FinishTime time.Time `json:"finishedAt"` + CreatedAt time.Time `json:"createdAt"` + StartedAt time.Time `json:"startedAt"` + FinishedAt time.Time `json:"finishedAt"` } type Group struct { diff --git a/internal/storage/storage.go b/internal/storage/storage.go index d76f613..cc16df1 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -279,7 +279,7 @@ func (s *Storage) PollList(ctx context.Context) (*tasklist.TaskList, error) { // GetGroupTasks fetches all tasks by a groupID func (s *Storage) GetGroupTasks(ctx context.Context, group *tasklist.Group) ([]*task.Task, error) { filter := bson.M{"groupid": group.ID} - opts := options.Find().SetSort(bson.M{"createdat": 1}) + opts := options.Find().SetSort(bson.M{"createdAt": 1}) cursor, err := s.tasks.Find(ctx, filter, opts) if err != nil { -- GitLab