diff --git a/internal/listexecutor/listexecutor.go b/internal/listexecutor/listexecutor.go index 09f5de6ffc2222c15b4fffe0a692e34bb6ae4a2b..530e0eee6c67926ef6f7dab60241daad6c861b04 100644 --- a/internal/listexecutor/listexecutor.go +++ b/internal/listexecutor/listexecutor.go @@ -90,15 +90,17 @@ loop: case <-ctx.Done(): break loop case <-time.After(l.pollInterval): + sem <- token{} // acquire a semaphore + taskList, err := l.queue.PollList(ctx) if err != nil { if !errors.Is(errors.NotFound, err) { l.logger.Error("error getting taskList from queue", zap.Error(err)) } + <-sem // release the semaphore continue } - sem <- token{} // acquire a semaphore go func(list *tasklist.TaskList) { l.Execute(ctx, list) <-sem // release the semaphore @@ -120,7 +122,7 @@ func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) { zap.String("taskListName", list.Name), ) list.State = taskpkg.Pending - list.StartTime = time.Now() + list.StartedAt = time.Now() // execute groups sequentially for i := range list.Groups { @@ -134,7 +136,7 @@ func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) { if list.State != taskpkg.Failed { list.State = taskpkg.Done } - list.FinishTime = time.Now() + list.FinishedAt = time.Now() if err := l.storage.SaveTaskListHistory(ctx, list); err != nil { logger.Error("error saving taskList history", zap.Error(err)) diff --git a/internal/service/tasklist/service.go b/internal/service/tasklist/service.go index bc996f5bfdbdd9978fd581edd1c13ddb5f9aca7a..5542a7d24eace9e0d4782eea171151a4a46b4603 100644 --- a/internal/service/tasklist/service.go +++ b/internal/service/tasklist/service.go @@ -78,7 +78,7 @@ func (s *Service) Create(ctx context.Context, req *goatasklist.CreateTaskListReq CacheScope: template.CacheScope, CacheNamespace: template.CacheNamespace, State: task.Created, - CreateTime: time.Now(), + CreatedAt: time.Now(), } // if cache namespace and scope are given, use them instead of the defaults diff --git a/internal/service/tasklist/task_list.go b/internal/service/tasklist/task_list.go index 065e72071705c22644d84d0143862846e4c045a2..0c9827e01d4555f09366b1b2808e51c4aa7c7d60 100644 --- a/internal/service/tasklist/task_list.go +++ b/internal/service/tasklist/task_list.go @@ -27,9 +27,9 @@ type TaskList struct { Request []byte `json:"request"` CacheNamespace string `json:"cacheNamespace"` CacheScope string `json:"cacheScope"` - CreateTime time.Time `json:"createdAt"` - StartTime time.Time `json:"startedAt"` - FinishTime time.Time `json:"finishedAt"` + CreatedAt time.Time `json:"createdAt"` + StartedAt time.Time `json:"startedAt"` + FinishedAt time.Time `json:"finishedAt"` } type Group struct { diff --git a/internal/storage/storage.go b/internal/storage/storage.go index d76f613471b0e24f1222a40b79e07c188c0caece..cc16df1cd4392949218e17a91b7ab8995395864d 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -279,7 +279,7 @@ func (s *Storage) PollList(ctx context.Context) (*tasklist.TaskList, error) { // GetGroupTasks fetches all tasks by a groupID func (s *Storage) GetGroupTasks(ctx context.Context, group *tasklist.Group) ([]*task.Task, error) { filter := bson.M{"groupid": group.ID} - opts := options.Find().SetSort(bson.M{"createdat": 1}) + opts := options.Find().SetSort(bson.M{"createdAt": 1}) cursor, err := s.tasks.Find(ctx, filter, opts) if err != nil {