Newer
Older
package storage
import (
"context"
"strings"
"github.com/cenkalti/backoff/v4"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"code.vereign.com/gaiax/tsa/golib/errors"
"code.vereign.com/gaiax/tsa/task/internal/service/task"
)
const (
taskDB = "task"
taskTemplates = "taskTemplates"
taskQueue = "tasks"
templates *mongo.Collection
tasks *mongo.Collection
tasksHistory *mongo.Collection
}
func New(db *mongo.Client) *Storage {
return &Storage{
templates: db.Database(taskDB).Collection(taskTemplates),
tasks: db.Database(taskDB).Collection(taskQueue),
tasksHistory: db.Database(taskDB).Collection(tasksHistory),
}
}
func (s *Storage) TaskTemplate(ctx context.Context, taskName string) (*task.Task, error) {
result := s.templates.FindOne(ctx, bson.M{
"name": taskName,
})
if result.Err() != nil {
if strings.Contains(result.Err().Error(), "no documents in result") {
return nil, errors.New(errors.NotFound, "task template not found")
}
return nil, result.Err()
}
var task task.Task
if err := result.Decode(&task); err != nil {
return nil, err
}
return &task, nil
}
func (s *Storage) Add(ctx context.Context, task *task.Task) error {
_, err := s.tasks.InsertOne(ctx, task)
return err
}
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// Poll retrieves one task from the tasks collection with the
// older ones being retrieved first (FIFO). It updates the state
// of the task to "pending", so that consequent calls to Poll would
// not retrieve the same task.
func (s *Storage) Poll(ctx context.Context) (*task.Task, error) {
opts := options.
FindOneAndUpdate().
SetSort(bson.M{"createdAt": 1}).
SetReturnDocument(options.After)
filter := bson.M{"state": task.Created}
update := bson.M{"$set": bson.M{"state": task.Pending}}
result := s.tasks.FindOneAndUpdate(
ctx,
filter,
update,
opts,
)
if result.Err() != nil {
if strings.Contains(result.Err().Error(), "no documents in result") {
return nil, errors.New(errors.NotFound, "task not found")
}
return nil, result.Err()
}
var task task.Task
if err := result.Decode(&task); err != nil {
return nil, err
}
return &task, nil
}
// Ack removes a task from the `tasks` collection.
func (s *Storage) Ack(ctx context.Context, task *task.Task) error {
_, err := s.tasks.DeleteOne(ctx, bson.M{"id": task.ID})
return err
}
// Unack changes the "pending" state of a task to "created", so that
// it can be retrieved for processing again.
func (s *Storage) Unack(ctx context.Context, t *task.Task) error {
filter := bson.M{"id": t.ID}
update := bson.M{"$set": bson.M{"state": task.Created}}
_, err := s.tasks.UpdateOne(ctx, filter, update)
return err
}

Lyuben Penkovski
committed
// SaveTaskHistory saves a task to the `tasksHistory` collection.
func (s *Storage) SaveTaskHistory(ctx context.Context, task *task.Task) error {
insert := func() error {
_, err := s.tasksHistory.InsertOne(ctx, task)
return err
}
b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
return backoff.Retry(insert, b)

Lyuben Penkovski
committed
}
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
func (s *Storage) Task(ctx context.Context, taskID string) (*task.Task, error) {
result := s.tasks.FindOne(ctx, bson.M{
"id": taskID,
})
if result.Err() != nil {
if strings.Contains(result.Err().Error(), "no documents in result") {
return nil, errors.New(errors.NotFound, "task not found")
}
return nil, result.Err()
}
var task task.Task
if err := result.Decode(&task); err != nil {
return nil, err
}
return &task, nil
}
func (s *Storage) TaskHistory(ctx context.Context, taskID string) (*task.Task, error) {
result := s.tasksHistory.FindOne(ctx, bson.M{
"id": taskID,
})
if result.Err() != nil {
if strings.Contains(result.Err().Error(), "no documents in result") {
return nil, errors.New(errors.NotFound, "task not found")
}
return nil, result.Err()
}
var task task.Task
if err := result.Decode(&task); err != nil {
return nil, err
}
return &task, nil
}