package storage import ( "context" "strings" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "code.vereign.com/gaiax/tsa/golib/errors" "code.vereign.com/gaiax/tsa/task/internal/service/task" ) const ( taskDB = "task" taskTemplates = "taskTemplates" taskQueue = "tasks" ) type Storage struct { templates *mongo.Collection tasks *mongo.Collection } func New(db *mongo.Client) *Storage { return &Storage{ templates: db.Database(taskDB).Collection(taskTemplates), tasks: db.Database(taskDB).Collection(taskQueue), } } func (s *Storage) TaskTemplate(ctx context.Context, taskName string) (*task.Task, error) { result := s.templates.FindOne(ctx, bson.M{ "name": taskName, }) if result.Err() != nil { if strings.Contains(result.Err().Error(), "no documents in result") { return nil, errors.New(errors.NotFound, "task template not found") } return nil, result.Err() } var task task.Task if err := result.Decode(&task); err != nil { return nil, err } return &task, nil } func (s *Storage) Add(ctx context.Context, task *task.Task) error { _, err := s.tasks.InsertOne(ctx, task) return err } // Poll retrieves one task from the tasks collection with the // older ones being retrieved first (FIFO). It updates the state // of the task to "pending", so that consequent calls to Poll would // not retrieve the same task. func (s *Storage) Poll(ctx context.Context) (*task.Task, error) { opts := options. FindOneAndUpdate(). SetSort(bson.M{"createdAt": 1}). SetReturnDocument(options.After) filter := bson.M{"state": task.Created} update := bson.M{"$set": bson.M{"state": task.Pending}} result := s.tasks.FindOneAndUpdate( ctx, filter, update, opts, ) if result.Err() != nil { if strings.Contains(result.Err().Error(), "no documents in result") { return nil, errors.New(errors.NotFound, "task not found") } return nil, result.Err() } var task task.Task if err := result.Decode(&task); err != nil { return nil, err } return &task, nil } // Ack removes a task from the tasks collection. func (s *Storage) Ack(ctx context.Context, task *task.Task) error { _, err := s.tasks.DeleteOne(ctx, bson.M{"id": task.ID}) return err } // Unack changes the "pending" state of a task to "created", so that // it can be retrieved for processing again. func (s *Storage) Unack(ctx context.Context, t *task.Task) error { filter := bson.M{"id": t.ID} update := bson.M{"$set": bson.M{"state": task.Created}} _, err := s.tasks.UpdateOne(ctx, filter, update) return err }