Skip to content
Snippets Groups Projects
Commit b0a105be authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

#13 rename struct fields

parent 5fefbeaf
No related branches found
No related tags found
1 merge request!8Sequential Task List executor
Pipeline #51224 passed
...@@ -90,15 +90,17 @@ loop: ...@@ -90,15 +90,17 @@ loop:
case <-ctx.Done(): case <-ctx.Done():
break loop break loop
case <-time.After(l.pollInterval): case <-time.After(l.pollInterval):
sem <- token{} // acquire a semaphore
taskList, err := l.queue.PollList(ctx) taskList, err := l.queue.PollList(ctx)
if err != nil { if err != nil {
if !errors.Is(errors.NotFound, err) { if !errors.Is(errors.NotFound, err) {
l.logger.Error("error getting taskList from queue", zap.Error(err)) l.logger.Error("error getting taskList from queue", zap.Error(err))
} }
<-sem // release the semaphore
continue continue
} }
sem <- token{} // acquire a semaphore
go func(list *tasklist.TaskList) { go func(list *tasklist.TaskList) {
l.Execute(ctx, list) l.Execute(ctx, list)
<-sem // release the semaphore <-sem // release the semaphore
...@@ -120,7 +122,7 @@ func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) { ...@@ -120,7 +122,7 @@ func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) {
zap.String("taskListName", list.Name), zap.String("taskListName", list.Name),
) )
list.State = taskpkg.Pending list.State = taskpkg.Pending
list.StartTime = time.Now() list.StartedAt = time.Now()
// execute groups sequentially // execute groups sequentially
for i := range list.Groups { for i := range list.Groups {
...@@ -134,7 +136,7 @@ func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) { ...@@ -134,7 +136,7 @@ func (l *ListExecutor) Execute(ctx context.Context, list *tasklist.TaskList) {
if list.State != taskpkg.Failed { if list.State != taskpkg.Failed {
list.State = taskpkg.Done list.State = taskpkg.Done
} }
list.FinishTime = time.Now() list.FinishedAt = time.Now()
if err := l.storage.SaveTaskListHistory(ctx, list); err != nil { if err := l.storage.SaveTaskListHistory(ctx, list); err != nil {
logger.Error("error saving taskList history", zap.Error(err)) logger.Error("error saving taskList history", zap.Error(err))
......
...@@ -78,7 +78,7 @@ func (s *Service) Create(ctx context.Context, req *goatasklist.CreateTaskListReq ...@@ -78,7 +78,7 @@ func (s *Service) Create(ctx context.Context, req *goatasklist.CreateTaskListReq
CacheScope: template.CacheScope, CacheScope: template.CacheScope,
CacheNamespace: template.CacheNamespace, CacheNamespace: template.CacheNamespace,
State: task.Created, State: task.Created,
CreateTime: time.Now(), CreatedAt: time.Now(),
} }
// if cache namespace and scope are given, use them instead of the defaults // if cache namespace and scope are given, use them instead of the defaults
......
...@@ -27,9 +27,9 @@ type TaskList struct { ...@@ -27,9 +27,9 @@ type TaskList struct {
Request []byte `json:"request"` Request []byte `json:"request"`
CacheNamespace string `json:"cacheNamespace"` CacheNamespace string `json:"cacheNamespace"`
CacheScope string `json:"cacheScope"` CacheScope string `json:"cacheScope"`
CreateTime time.Time `json:"createdAt"` CreatedAt time.Time `json:"createdAt"`
StartTime time.Time `json:"startedAt"` StartedAt time.Time `json:"startedAt"`
FinishTime time.Time `json:"finishedAt"` FinishedAt time.Time `json:"finishedAt"`
} }
type Group struct { type Group struct {
......
...@@ -279,7 +279,7 @@ func (s *Storage) PollList(ctx context.Context) (*tasklist.TaskList, error) { ...@@ -279,7 +279,7 @@ func (s *Storage) PollList(ctx context.Context) (*tasklist.TaskList, error) {
// GetGroupTasks fetches all tasks by a groupID // GetGroupTasks fetches all tasks by a groupID
func (s *Storage) GetGroupTasks(ctx context.Context, group *tasklist.Group) ([]*task.Task, error) { func (s *Storage) GetGroupTasks(ctx context.Context, group *tasklist.Group) ([]*task.Task, error) {
filter := bson.M{"groupid": group.ID} filter := bson.M{"groupid": group.ID}
opts := options.Find().SetSort(bson.M{"createdat": 1}) opts := options.Find().SetSort(bson.M{"createdAt": 1})
cursor, err := s.tasks.Find(ctx, filter, opts) cursor, err := s.tasks.Find(ctx, filter, opts)
if err != nil { if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment