Skip to content
Snippets Groups Projects
Commit 2bfcd368 authored by Lyuben Penkovski's avatar Lyuben Penkovski
Browse files

Queue-like interface for retrieving tasks from storage

parent b1ac1880
Branches
Tags
1 merge request!3Queue-like interface for retrieving tasks from storage
Pipeline #49872 passed
...@@ -17,7 +17,10 @@ type Storage interface { ...@@ -17,7 +17,10 @@ type Storage interface {
} }
type Queue interface { type Queue interface {
AddTask(ctx context.Context, task *Task) error Add(ctx context.Context, task *Task) error
Poll(ctx context.Context) (*Task, error)
Ack(ctx context.Context, task *Task) error
Unack(ctx context.Context, task *Task) error
} }
type Service struct { type Service struct {
...@@ -56,7 +59,7 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res * ...@@ -56,7 +59,7 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *
task.CreatedAt = time.Now() task.CreatedAt = time.Now()
task.Request = taskRequest task.Request = taskRequest
if err := s.queue.AddTask(ctx, task); err != nil { if err := s.queue.Add(ctx, task); err != nil {
logger.Error("error adding task to queue", zap.Error(err)) logger.Error("error adding task to queue", zap.Error(err))
return nil, errors.New("failed to create task", err) return nil, errors.New("failed to create task", err)
} }
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"code.vereign.com/gaiax/tsa/golib/errors" "code.vereign.com/gaiax/tsa/golib/errors"
"code.vereign.com/gaiax/tsa/task/internal/service/task" "code.vereign.com/gaiax/tsa/task/internal/service/task"
...@@ -49,7 +50,56 @@ func (s *Storage) TaskTemplate(ctx context.Context, taskName string) (*task.Task ...@@ -49,7 +50,56 @@ func (s *Storage) TaskTemplate(ctx context.Context, taskName string) (*task.Task
return &task, nil return &task, nil
} }
func (s *Storage) AddTask(ctx context.Context, task *task.Task) error { func (s *Storage) Add(ctx context.Context, task *task.Task) error {
_, err := s.tasks.InsertOne(ctx, task) _, err := s.tasks.InsertOne(ctx, task)
return err return err
} }
// Poll retrieves one task from the tasks collection with the
// older ones being retrieved first (FIFO). It updates the state
// of the task to "pending", so that consequent calls to Poll would
// not retrieve the same task.
func (s *Storage) Poll(ctx context.Context) (*task.Task, error) {
opts := options.
FindOneAndUpdate().
SetSort(bson.M{"createdAt": 1}).
SetReturnDocument(options.After)
filter := bson.M{"state": task.Created}
update := bson.M{"$set": bson.M{"state": task.Pending}}
result := s.tasks.FindOneAndUpdate(
ctx,
filter,
update,
opts,
)
if result.Err() != nil {
if strings.Contains(result.Err().Error(), "no documents in result") {
return nil, errors.New(errors.NotFound, "task not found")
}
return nil, result.Err()
}
var task task.Task
if err := result.Decode(&task); err != nil {
return nil, err
}
return &task, nil
}
// Ack removes a task from the tasks collection.
func (s *Storage) Ack(ctx context.Context, task *task.Task) error {
_, err := s.tasks.DeleteOne(ctx, bson.M{"id": task.ID})
return err
}
// Unack changes the "pending" state of a task to "created", so that
// it can be retrieved for processing again.
func (s *Storage) Unack(ctx context.Context, t *task.Task) error {
filter := bson.M{"id": t.ID}
update := bson.M{"$set": bson.M{"state": task.Created}}
_, err := s.tasks.UpdateOne(ctx, filter, update)
return err
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment