diff --git a/internal/service/task/service.go b/internal/service/task/service.go index 024e1112a6e84aac8da03d4e25b48f324027b88c..47a095cbb0bb79314769877587061a25c19c2853 100644 --- a/internal/service/task/service.go +++ b/internal/service/task/service.go @@ -17,7 +17,10 @@ type Storage interface { } type Queue interface { - AddTask(ctx context.Context, task *Task) error + Add(ctx context.Context, task *Task) error + Poll(ctx context.Context) (*Task, error) + Ack(ctx context.Context, task *Task) error + Unack(ctx context.Context, task *Task) error } type Service struct { @@ -56,7 +59,7 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res * task.CreatedAt = time.Now() task.Request = taskRequest - if err := s.queue.AddTask(ctx, task); err != nil { + if err := s.queue.Add(ctx, task); err != nil { logger.Error("error adding task to queue", zap.Error(err)) return nil, errors.New("failed to create task", err) } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index be5eff9b7640316c975dc836fdb4caedaa50f434..09bb73956b7dbe7308121ff9e890ecf1354fc9bb 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -6,6 +6,7 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "code.vereign.com/gaiax/tsa/golib/errors" "code.vereign.com/gaiax/tsa/task/internal/service/task" @@ -49,7 +50,56 @@ func (s *Storage) TaskTemplate(ctx context.Context, taskName string) (*task.Task return &task, nil } -func (s *Storage) AddTask(ctx context.Context, task *task.Task) error { +func (s *Storage) Add(ctx context.Context, task *task.Task) error { _, err := s.tasks.InsertOne(ctx, task) return err } + +// Poll retrieves one task from the tasks collection with the +// older ones being retrieved first (FIFO). It updates the state +// of the task to "pending", so that consequent calls to Poll would +// not retrieve the same task. +func (s *Storage) Poll(ctx context.Context) (*task.Task, error) { + opts := options. + FindOneAndUpdate(). + SetSort(bson.M{"createdAt": 1}). + SetReturnDocument(options.After) + + filter := bson.M{"state": task.Created} + update := bson.M{"$set": bson.M{"state": task.Pending}} + result := s.tasks.FindOneAndUpdate( + ctx, + filter, + update, + opts, + ) + + if result.Err() != nil { + if strings.Contains(result.Err().Error(), "no documents in result") { + return nil, errors.New(errors.NotFound, "task not found") + } + return nil, result.Err() + } + + var task task.Task + if err := result.Decode(&task); err != nil { + return nil, err + } + + return &task, nil +} + +// Ack removes a task from the tasks collection. +func (s *Storage) Ack(ctx context.Context, task *task.Task) error { + _, err := s.tasks.DeleteOne(ctx, bson.M{"id": task.ID}) + return err +} + +// Unack changes the "pending" state of a task to "created", so that +// it can be retrieved for processing again. +func (s *Storage) Unack(ctx context.Context, t *task.Task) error { + filter := bson.M{"id": t.ID} + update := bson.M{"$set": bson.M{"state": task.Created}} + _, err := s.tasks.UpdateOne(ctx, filter, update) + return err +}