Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • gaiax/tsa/infohub
1 result
Show changes
Commits on Source (2)
Showing
with 1243 additions and 1 deletion
vendor/**/* -diff
.idea/
\ No newline at end of file
.idea/
# Information HUB service
Provides trusted endpoints for getting, importing and exporting trusted data.
package main
import (
"context"
"log"
"net/http"
"time"
"github.com/kelseyhightower/envconfig"
"github.com/penkovski/graceful"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"code.vereign.com/gaiax/tsa/infohub/internal/cache"
"code.vereign.com/gaiax/tsa/infohub/internal/config"
"code.vereign.com/gaiax/tsa/infohub/internal/httpapi"
"code.vereign.com/gaiax/tsa/infohub/internal/tasks"
)
var Version = "0.0.0+development"
func main() {
var cfg config.Config
if err := envconfig.Process("", &cfg); err != nil {
log.Fatal("error loading configuration:", err)
}
logger, err := createLogger()
if err != nil {
log.Fatalln(err)
}
defer logger.Sync() //nolint:errcheck
logger.Info("starting information hub service", zap.String("version", Version))
cache := cache.New(
cfg.Cache.Host,
cfg.Cache.Port,
cfg.Cache.Pass,
cfg.Cache.DB,
logger,
)
tasks := tasks.New(cache, logger)
go tasks.Start(context.Background())
handler := httpapi.New(tasks, logger)
httpsrv := &http.Server{
Addr: cfg.HTTP.Host + ":" + cfg.HTTP.Port,
Handler: handler,
ReadTimeout: cfg.HTTP.ReadTimeout,
WriteTimeout: cfg.HTTP.WriteTimeout,
IdleTimeout: cfg.HTTP.IdleTimeout,
}
if err := graceful.Start(httpsrv, 20*time.Second); err != nil {
logger.Fatal("server shutdown error", zap.Error(err))
}
}
func createLogger(opts ...zap.Option) (*zap.Logger, error) {
config := zap.NewProductionConfig()
config.DisableStacktrace = true
config.EncoderConfig.TimeKey = "ts"
config.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder
return config.Build(opts...)
}
FROM golang:1.17.6
ENV GO111MODULE=on
ENV GOPRIVATE=code.vereign.com
RUN go install github.com/canthefason/go-watcher/cmd/watcher@v0.2.4
ADD . /go/src/code.vereign.com/gaiax/tsa/infohub
WORKDIR /go/src/code.vereign.com/gaiax/tsa/infohub/
RUN go install -mod=vendor ./cmd/infohub/...
EXPOSE 8080
ENTRYPOINT ["sh", "-c", "/go/bin/watcher -run code.vereign.com/gaiax/tsa/infohub/cmd/infohub -watch code.vereign.com/gaiax/tsa/infohub"]
module code.vereign.com/gaiax/tsa/infohub
go 1.17
require (
code.vereign.com/seal/backend/golib.git v0.0.1
github.com/go-redis/redis/v8 v8.4.2
github.com/google/uuid v1.1.2
github.com/gorilla/mux v1.8.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/penkovski/graceful v0.0.0-20220129102507-ba0dd223eb96
go.uber.org/zap v1.21.0
)
require (
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
go.opentelemetry.io/otel v0.14.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
This diff is collapsed.
package cache
import (
"context"
"errors"
"time"
"github.com/go-redis/redis/v8"
"go.uber.org/zap"
)
const queueName = "taskQueue"
var ErrNotFound = errors.New("value not found in cache")
type Cache struct {
logger *zap.Logger
redis *redis.Client
}
func New(addr, port, pass string, db int, logger *zap.Logger) *Cache {
rdb := redis.NewClient(&redis.Options{
Addr: addr + ":" + port,
Password: pass,
DB: db,
})
return &Cache{
logger: logger,
redis: rdb,
}
}
func (c *Cache) Set(ctx context.Context, key, value string, ttl time.Duration) error {
return c.redis.Set(ctx, key, value, ttl).Err()
}
func (c *Cache) Get(ctx context.Context, key string) (string, error) {
res, err := c.redis.Get(ctx, key).Result()
if err != nil {
if err == redis.Nil {
return "", ErrNotFound
}
return "", err
}
return res, nil
}
func (c *Cache) Enqueue(ctx context.Context, value string) error {
return c.redis.RPush(ctx, queueName, value).Err()
}
func (c *Cache) Dequeue(ctx context.Context) (string, error) {
res, err := c.redis.LPop(ctx, queueName).Result()
if err != nil {
return "", err
}
return res, nil
}
package config
import "time"
type Config struct {
HTTP httpConfig
Cache cache
}
type httpConfig struct {
Host string `envconfig:"HTTP_HOST"`
Port string `envconfig:"HTTP_PORT" default:"8080"`
IdleTimeout time.Duration `envconfig:"HTTP_IDLE_TIMEOUT" default:"120s"`
ReadTimeout time.Duration `envconfig:"HTTP_READ_TIMEOUT" default:"10s"`
WriteTimeout time.Duration `envconfig:"HTTP_WRITE_TIMEOUT" default:"10s"`
MaxRequestSize int `envconfig:"HTTP_MAX_REQUEST_SIZE" default:"10000"` // 10000 bytes default
}
type cache struct {
Host string `envconfig:"REDIS_HOST"`
Port string `envconfig:"REDIS_PORT" default:"6379"`
Pass string `envconfig:"REDIS_PASS"`
DB int `envconfig:"REDIS_DB" default:"0""`
}
package httpapi
import (
"context"
"encoding/json"
"net/http"
"github.com/gorilla/mux"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/infohub/internal/tasks"
"code.vereign.com/seal/backend/golib.git/errors"
)
type apiHandler struct {
logger *zap.Logger
tasks TasksController
}
type TasksController interface {
GetData(ctx context.Context, key string) (*tasks.GetDataResponse, error)
}
func New(tasks TasksController, logger *zap.Logger) http.Handler {
handler := &apiHandler{
logger: logger,
tasks: tasks,
}
router := mux.NewRouter()
router.Path("/alive").HandlerFunc(handler.Alive).Methods("GET")
router.Path("/ready").HandlerFunc(handler.Ready).Methods("GET")
router.Path("/v1/info").HandlerFunc(handler.GetInfo).Methods("POST")
return router
}
func (h *apiHandler) GetInfo(w http.ResponseWriter, r *http.Request) {
var req GetInfoRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
h.logger.Error("error decoding request", zap.Error(err))
errors.JSON(w, errors.New(errors.BadRequest, "invalid request"))
return
}
res, err := h.tasks.GetData(r.Context(), req.Data)
if err != nil {
h.logger.Error("error getting data from task controller", zap.Error(err))
errors.JSON(w, errors.New(errors.Internal, "failed to get data"))
return
}
response := &GetInfoResponse{
ID: res.ID,
Data: res.Data,
}
if err = json.NewEncoder(w).Encode(response); err != nil {
h.logger.Error("infohub get data: failed to send response", zap.Error(err))
}
}
func (h *apiHandler) Alive(w http.ResponseWriter, r *http.Request) {
h.encodeJSON(w, &struct {
Status string `json:"status"`
}{"up"})
}
func (h *apiHandler) Ready(w http.ResponseWriter, r *http.Request) {
h.encodeJSON(w, &struct {
Status string `json:"status"`
}{"up"})
}
func (h *apiHandler) encodeJSON(w http.ResponseWriter, resp interface{}) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}
package httpapi
type GetInfoRequest struct {
Type string `json:"type"`
Data string `json:"data"`
}
type GetInfoResponse struct {
ID string `json:"id"`
Data string `json:"data"`
}
package tasks
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"sync"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/infohub/internal/cache"
"code.vereign.com/seal/backend/golib.git/errors"
)
type Controller struct {
logger *zap.Logger
cache Cache
taskChan chan Task
}
type Cache interface {
Set(ctx context.Context, key, value string, ttl time.Duration) error
Get(ctx context.Context, key string) (string, error)
Enqueue(ctx context.Context, value string) error
Dequeue(ctx context.Context) (string, error)
}
func New(cache Cache, logger *zap.Logger) *Controller {
return &Controller{
logger: logger,
cache: cache,
taskChan: make(chan Task),
}
}
func (c *Controller) GetData(ctx context.Context, key string) (*GetDataResponse, error) {
data, err := c.cache.Get(ctx, key)
// key not found in cache
if err == cache.ErrNotFound {
task := createResolveTask(key) // create task
err = c.queueTask(ctx, task) // add task in queue
if err != nil {
return nil, errors.New("adding task to queue failed", err)
}
c.logger.Info(fmt.Sprintf("task with ID: %s is added to queue", task.ID))
return &GetDataResponse{ID: task.ID}, nil
}
if err != nil {
return nil, errors.New("failed get value form cache", err)
}
// key found in cache
return &GetDataResponse{Data: data}, nil
}
func (c *Controller) Start(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
c.executeWorker(ctx, c.taskChan)
}()
for {
select {
case <-ctx.Done():
wg.Wait()
return
case <-time.After(30 * time.Second):
var task Task
val, err := c.cache.Dequeue(ctx)
if val == "" {
continue
}
if err != nil {
c.logger.Error("failed to dequeue task", zap.Error(err))
continue
}
err = json.Unmarshal([]byte(val), &task)
if err != nil {
c.logger.Error("failed to decode task from queue", zap.Error(err))
continue
}
c.logger.Info(fmt.Sprintf("task with ID: %s is pulled out of the queue for execution", task.ID))
c.taskChan <- task
}
}
}
func (c *Controller) executeWorker(ctx context.Context, taskChan chan Task) {
for {
select {
case <-ctx.Done():
c.logger.Info("task executor worker stopped", zap.Error(ctx.Err()))
return
case task := <-c.taskChan:
err := c.executeTask(ctx, task)
if err != nil {
c.logger.Error("failed to execute task", zap.Error(err))
// TODO(kinkov): handle failed task properly
continue
}
}
}
}
func (c *Controller) executeTask(ctx context.Context, task Task) error {
var url string
var body io.Reader
if task.Method == "GET" {
url = task.Url + task.Body
}
// TODO(kinkov): implement POST method
req, err := http.NewRequest(task.Method, url, body)
if err != nil {
return err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
bytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
err = c.cache.Set(ctx, task.ID, string(bytes), 0)
if err != nil {
return err
}
err = c.cache.Set(ctx, task.Body, string(bytes), 0)
if err != nil {
return err
}
c.logger.Info(fmt.Sprintf("task with ID: %s is executed successfully", task.ID))
return nil
}
func (c *Controller) queueTask(ctx context.Context, task Task) error {
bytes, err := json.Marshal(task)
if err != nil {
return errors.New("failed to encode task", err)
}
err = c.cache.Enqueue(ctx, string(bytes)) // add task in queue
if err != nil {
return errors.New("failed to enqueue task", err)
}
return nil
}
func createResolveTask(key string) Task {
return Task{
ID: uuid.New().String(),
Url: "http://uni-resolver-web:8080/1.0/identifiers/",
Body: key,
Method: "GET",
}
}
package tasks
type Task struct {
ID string
Url string
Body string
Method string
}
package tasks
type GetDataResponse struct {
ID string
Data string
}
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.