From 2bfcd3680ff9aecb9fc862c8aad5a3c74039194c Mon Sep 17 00:00:00 2001
From: Lyuben Penkovski <lyuben.penkovski@vereign.com>
Date: Wed, 6 Apr 2022 19:21:53 +0300
Subject: [PATCH] Queue-like interface for retrieving tasks from storage

---
 internal/service/task/service.go |  7 +++--
 internal/storage/storage.go      | 52 +++++++++++++++++++++++++++++++-
 2 files changed, 56 insertions(+), 3 deletions(-)

diff --git a/internal/service/task/service.go b/internal/service/task/service.go
index 024e111..47a095c 100644
--- a/internal/service/task/service.go
+++ b/internal/service/task/service.go
@@ -17,7 +17,10 @@ type Storage interface {
 }
 
 type Queue interface {
-	AddTask(ctx context.Context, task *Task) error
+	Add(ctx context.Context, task *Task) error
+	Poll(ctx context.Context) (*Task, error)
+	Ack(ctx context.Context, task *Task) error
+	Unack(ctx context.Context, task *Task) error
 }
 
 type Service struct {
@@ -56,7 +59,7 @@ func (s *Service) Create(ctx context.Context, req *goatask.CreateRequest) (res *
 	task.CreatedAt = time.Now()
 	task.Request = taskRequest
 
-	if err := s.queue.AddTask(ctx, task); err != nil {
+	if err := s.queue.Add(ctx, task); err != nil {
 		logger.Error("error adding task to queue", zap.Error(err))
 		return nil, errors.New("failed to create task", err)
 	}
diff --git a/internal/storage/storage.go b/internal/storage/storage.go
index be5eff9..09bb739 100644
--- a/internal/storage/storage.go
+++ b/internal/storage/storage.go
@@ -6,6 +6,7 @@ import (
 
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
 
 	"code.vereign.com/gaiax/tsa/golib/errors"
 	"code.vereign.com/gaiax/tsa/task/internal/service/task"
@@ -49,7 +50,56 @@ func (s *Storage) TaskTemplate(ctx context.Context, taskName string) (*task.Task
 	return &task, nil
 }
 
-func (s *Storage) AddTask(ctx context.Context, task *task.Task) error {
+func (s *Storage) Add(ctx context.Context, task *task.Task) error {
 	_, err := s.tasks.InsertOne(ctx, task)
 	return err
 }
+
+// Poll retrieves one task from the tasks collection with the
+// older ones being retrieved first (FIFO). It updates the state
+// of the task to "pending", so that consequent calls to Poll would
+// not retrieve the same task.
+func (s *Storage) Poll(ctx context.Context) (*task.Task, error) {
+	opts := options.
+		FindOneAndUpdate().
+		SetSort(bson.M{"createdAt": 1}).
+		SetReturnDocument(options.After)
+
+	filter := bson.M{"state": task.Created}
+	update := bson.M{"$set": bson.M{"state": task.Pending}}
+	result := s.tasks.FindOneAndUpdate(
+		ctx,
+		filter,
+		update,
+		opts,
+	)
+
+	if result.Err() != nil {
+		if strings.Contains(result.Err().Error(), "no documents in result") {
+			return nil, errors.New(errors.NotFound, "task not found")
+		}
+		return nil, result.Err()
+	}
+
+	var task task.Task
+	if err := result.Decode(&task); err != nil {
+		return nil, err
+	}
+
+	return &task, nil
+}
+
+// Ack removes a task from the tasks collection.
+func (s *Storage) Ack(ctx context.Context, task *task.Task) error {
+	_, err := s.tasks.DeleteOne(ctx, bson.M{"id": task.ID})
+	return err
+}
+
+// Unack changes the "pending" state of a task to "created", so that
+// it can be retrieved for processing again.
+func (s *Storage) Unack(ctx context.Context, t *task.Task) error {
+	filter := bson.M{"id": t.ID}
+	update := bson.M{"$set": bson.M{"state": task.Created}}
+	_, err := s.tasks.UpdateOne(ctx, filter, update)
+	return err
+}
-- 
GitLab