Skip to content
Snippets Groups Projects
storage.go 2.7 KiB
Newer Older
  • Learn to ignore specific revisions
  • package storage
    
    import (
    	"context"
    	"strings"
    
    	"go.mongodb.org/mongo-driver/bson"
    	"go.mongodb.org/mongo-driver/mongo"
    
    	"go.mongodb.org/mongo-driver/mongo/options"
    
    
    	"code.vereign.com/gaiax/tsa/golib/errors"
    	"code.vereign.com/gaiax/tsa/task/internal/service/task"
    )
    
    const (
    	taskDB        = "task"
    	taskTemplates = "taskTemplates"
    	taskQueue     = "tasks"
    
    	tasksHistory  = "tasksHistory"
    
    )
    
    type Storage struct {
    
    	templates    *mongo.Collection
    	tasks        *mongo.Collection
    	tasksHistory *mongo.Collection
    
    }
    
    func New(db *mongo.Client) *Storage {
    	return &Storage{
    
    		templates:    db.Database(taskDB).Collection(taskTemplates),
    		tasks:        db.Database(taskDB).Collection(taskQueue),
    		tasksHistory: db.Database(taskDB).Collection(tasksHistory),
    
    	}
    }
    
    func (s *Storage) TaskTemplate(ctx context.Context, taskName string) (*task.Task, error) {
    	result := s.templates.FindOne(ctx, bson.M{
    		"name": taskName,
    	})
    
    	if result.Err() != nil {
    		if strings.Contains(result.Err().Error(), "no documents in result") {
    			return nil, errors.New(errors.NotFound, "task template not found")
    		}
    		return nil, result.Err()
    	}
    
    	var task task.Task
    	if err := result.Decode(&task); err != nil {
    		return nil, err
    	}
    
    	return &task, nil
    }
    
    
    func (s *Storage) Add(ctx context.Context, task *task.Task) error {
    
    	_, err := s.tasks.InsertOne(ctx, task)
    	return err
    }
    
    
    // Poll retrieves one task from the tasks collection with the
    // older ones being retrieved first (FIFO). It updates the state
    // of the task to "pending", so that consequent calls to Poll would
    // not retrieve the same task.
    func (s *Storage) Poll(ctx context.Context) (*task.Task, error) {
    	opts := options.
    		FindOneAndUpdate().
    		SetSort(bson.M{"createdAt": 1}).
    		SetReturnDocument(options.After)
    
    	filter := bson.M{"state": task.Created}
    	update := bson.M{"$set": bson.M{"state": task.Pending}}
    	result := s.tasks.FindOneAndUpdate(
    		ctx,
    		filter,
    		update,
    		opts,
    	)
    
    	if result.Err() != nil {
    		if strings.Contains(result.Err().Error(), "no documents in result") {
    			return nil, errors.New(errors.NotFound, "task not found")
    		}
    		return nil, result.Err()
    	}
    
    	var task task.Task
    	if err := result.Decode(&task); err != nil {
    		return nil, err
    	}
    
    	return &task, nil
    }
    
    
    // Ack removes a task from the `tasks` collection.
    
    func (s *Storage) Ack(ctx context.Context, task *task.Task) error {
    	_, err := s.tasks.DeleteOne(ctx, bson.M{"id": task.ID})
    	return err
    }
    
    // Unack changes the "pending" state of a task to "created", so that
    // it can be retrieved for processing again.
    func (s *Storage) Unack(ctx context.Context, t *task.Task) error {
    	filter := bson.M{"id": t.ID}
    	update := bson.M{"$set": bson.M{"state": task.Created}}
    	_, err := s.tasks.UpdateOne(ctx, filter, update)
    	return err
    }