Skip to content
Snippets Groups Projects
storage.go 6.25 KiB
Newer Older
  • Learn to ignore specific revisions
  • package storage
    
    import (
    	"context"
    	"strings"
    
    
    	"github.com/cenkalti/backoff/v4"
    
    	"go.mongodb.org/mongo-driver/bson"
    	"go.mongodb.org/mongo-driver/mongo"
    
    	"go.mongodb.org/mongo-driver/mongo/options"
    
    
    	"code.vereign.com/gaiax/tsa/golib/errors"
    	"code.vereign.com/gaiax/tsa/task/internal/service/task"
    
    	"code.vereign.com/gaiax/tsa/task/internal/service/tasklist"
    
    	taskDB            = "task"
    	taskTemplates     = "taskTemplates"
    	taskQueue         = "tasks"
    	tasksHistory      = "tasksHistory"
    	taskListQueue     = "taskLists"
    	taskListTemplates = "taskListTemplates"
    
    )
    
    type Storage struct {
    
    	taskTemplates     *mongo.Collection
    	tasks             *mongo.Collection
    	tasksHistory      *mongo.Collection
    	taskLists         *mongo.Collection
    	taskListTemplates *mongo.Collection
    
    }
    
    func New(db *mongo.Client) *Storage {
    	return &Storage{
    
    		taskTemplates:     db.Database(taskDB).Collection(taskTemplates),
    		tasks:             db.Database(taskDB).Collection(taskQueue),
    		tasksHistory:      db.Database(taskDB).Collection(tasksHistory),
    		taskListTemplates: db.Database(taskDB).Collection(taskListTemplates),
    		taskLists:         db.Database(taskDB).Collection(taskListQueue),
    
    	}
    }
    
    func (s *Storage) TaskTemplate(ctx context.Context, taskName string) (*task.Task, error) {
    
    	result := s.taskTemplates.FindOne(ctx, bson.M{
    
    		"name": taskName,
    	})
    
    	if result.Err() != nil {
    		if strings.Contains(result.Err().Error(), "no documents in result") {
    			return nil, errors.New(errors.NotFound, "task template not found")
    		}
    		return nil, result.Err()
    	}
    
    	var task task.Task
    	if err := result.Decode(&task); err != nil {
    		return nil, err
    	}
    
    	return &task, nil
    }
    
    
    func (s *Storage) Add(ctx context.Context, task *task.Task) error {
    
    	_, err := s.tasks.InsertOne(ctx, task)
    	return err
    }
    
    // Poll retrieves one task with empty groupID from the tasks collection
    // with the older ones being retrieved first (FIFO). It updates the state
    
    // of the task to "pending", so that consequent calls to Poll would
    // not retrieve the same task.
    func (s *Storage) Poll(ctx context.Context) (*task.Task, error) {
    	opts := options.
    		FindOneAndUpdate().
    		SetSort(bson.M{"createdAt": 1}).
    		SetReturnDocument(options.After)
    
    
    	filter := bson.M{"state": task.Created, "groupid": ""}
    
    	update := bson.M{"$set": bson.M{"state": task.Pending}}
    	result := s.tasks.FindOneAndUpdate(
    		ctx,
    		filter,
    		update,
    		opts,
    	)
    
    	if result.Err() != nil {
    		if strings.Contains(result.Err().Error(), "no documents in result") {
    			return nil, errors.New(errors.NotFound, "task not found")
    		}
    		return nil, result.Err()
    	}
    
    	var task task.Task
    	if err := result.Decode(&task); err != nil {
    		return nil, err
    	}
    
    	return &task, nil
    }
    
    
    // Ack removes a task from the `tasks` collection.
    
    func (s *Storage) Ack(ctx context.Context, task *task.Task) error {
    	_, err := s.tasks.DeleteOne(ctx, bson.M{"id": task.ID})
    	return err
    }
    
    // Unack changes the "pending" state of a task to "created", so that
    // it can be retrieved for processing again.
    func (s *Storage) Unack(ctx context.Context, t *task.Task) error {
    	filter := bson.M{"id": t.ID}
    	update := bson.M{"$set": bson.M{"state": task.Created}}
    	_, err := s.tasks.UpdateOne(ctx, filter, update)
    	return err
    }
    
    
    // SaveTaskHistory saves a task to the `tasksHistory` collection.
    func (s *Storage) SaveTaskHistory(ctx context.Context, task *task.Task) error {
    
    	insert := func() error {
    		_, err := s.tasksHistory.InsertOne(ctx, task)
    		return err
    	}
    
    	b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
    	return backoff.Retry(insert, b)
    
    
    func (s *Storage) Task(ctx context.Context, taskID string) (*task.Task, error) {
    	result := s.tasks.FindOne(ctx, bson.M{
    		"id": taskID,
    	})
    
    	if result.Err() != nil {
    		if strings.Contains(result.Err().Error(), "no documents in result") {
    			return nil, errors.New(errors.NotFound, "task not found")
    		}
    		return nil, result.Err()
    	}
    
    	var task task.Task
    	if err := result.Decode(&task); err != nil {
    		return nil, err
    	}
    
    	return &task, nil
    }
    
    func (s *Storage) TaskHistory(ctx context.Context, taskID string) (*task.Task, error) {
    	result := s.tasksHistory.FindOne(ctx, bson.M{
    		"id": taskID,
    	})
    
    	if result.Err() != nil {
    		if strings.Contains(result.Err().Error(), "no documents in result") {
    			return nil, errors.New(errors.NotFound, "task not found")
    		}
    		return nil, result.Err()
    	}
    
    	var task task.Task
    	if err := result.Decode(&task); err != nil {
    		return nil, err
    	}
    
    	return &task, nil
    }
    
    
    // TaskListTemplate retrieves one taskList definition by name from storage
    func (s *Storage) TaskListTemplate(ctx context.Context, taskListName string) (*tasklist.Template, error) {
    	result := s.taskListTemplates.FindOne(ctx, bson.M{
    		"name": taskListName,
    	})
    
    	if result.Err() != nil {
    		if strings.Contains(result.Err().Error(), "no documents in result") {
    			return nil, errors.New(errors.NotFound, "taskList template not found")
    		}
    		return nil, result.Err()
    	}
    
    	var tasklist tasklist.Template
    	if err := result.Decode(&tasklist); err != nil {
    		return nil, err
    	}
    
    	return &tasklist, nil
    }
    
    // TaskTemplates retrieves task definitions from storage by names.
    //
    // The result is a map where 'key' is the task name and 'value' is the task definition
    func (s *Storage) TaskTemplates(ctx context.Context, names []string) (map[string]*task.Task, error) {
    	cursor, err := s.taskTemplates.Find(ctx, bson.M{
    		"name": bson.M{"$in": names},
    	})
    	if err != nil {
    		return nil, err
    	}
    	defer cursor.Close(ctx)
    
    	res := make(map[string]*task.Task)
    	for cursor.Next(ctx) {
    		var task task.Task
    		if err := cursor.Decode(&task); err != nil {
    			return nil, err
    		}
    		res[task.Name] = &task
    	}
    
    	return res, nil
    }
    
    func (s *Storage) AddTaskList(ctx context.Context, taskList *tasklist.TaskList, tasks []*task.Task) error {
    	_, err := s.taskLists.InsertOne(ctx, taskList)
    	if err != nil {
    		return err
    	}
    
    	var ti []interface{}
    	for _, task := range tasks {
    		ti = append(ti, task)
    	}
    
    	_, err = s.tasks.InsertMany(ctx, ti)
    	if err != nil {
    		if err := s.AckList(ctx, taskList); err != nil { // remove taskList from queue
    			return errors.New("failed to ack taskList", err)
    		}
    		return err
    	}
    
    	return nil
    }
    
    // AckList removes a taskList from the `tasksLists` collection.
    func (s *Storage) AckList(ctx context.Context, taskList *tasklist.TaskList) error {
    	_, err := s.taskLists.DeleteOne(ctx, bson.M{"id": taskList.ID})
    	return err
    }