Skip to content
Snippets Groups Projects
Commit 3b371a45 authored by Yordan Kinkov's avatar Yordan Kinkov
Browse files

#1 WIP: initial tasks, infohub and cache implementation

parent 7210d1e8
No related branches found
No related tags found
No related merge requests found
Pipeline #48894 failed with stages
in 0 seconds
Showing
with 1136 additions and 1 deletion
vendor/**/* -diff
.idea/
\ No newline at end of file
.idea/
# Information HUB service
Provides trusted endpoints for getting, importing and exporting trusted data.
package main
import (
"log"
"net/http"
"time"
"github.com/kelseyhightower/envconfig"
"github.com/penkovski/graceful"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"code.vereign.com/gaiax/tsa/infohub/internal/cache"
"code.vereign.com/gaiax/tsa/infohub/internal/config"
"code.vereign.com/gaiax/tsa/infohub/internal/httpapi"
"code.vereign.com/gaiax/tsa/infohub/internal/tasks"
)
var Version = "0.0.0+development"
func main() {
var cfg config.Config
if err := envconfig.Process("", &cfg); err != nil {
log.Fatal("error loading configuration:", err)
}
logger, err := createLogger()
if err != nil {
log.Fatalln(err)
}
defer logger.Sync() //nolint:errcheck
logger.Info("starting information hub service", zap.String("version", Version))
cache := cache.New(
cfg.Cache.Host,
cfg.Cache.Port,
cfg.Cache.Pass,
cfg.Cache.DB,
logger,
)
tasks := tasks.New(cache, logger)
handler := httpapi.New(tasks, logger)
httpsrv := &http.Server{
Addr: cfg.HTTP.Host + ":" + cfg.HTTP.Port,
Handler: handler,
ReadTimeout: cfg.HTTP.ReadTimeout,
WriteTimeout: cfg.HTTP.WriteTimeout,
IdleTimeout: cfg.HTTP.IdleTimeout,
}
if err := graceful.Start(httpsrv, 20*time.Second); err != nil {
logger.Fatal("server shutdown error", zap.Error(err))
}
}
func createLogger(opts ...zap.Option) (*zap.Logger, error) {
config := zap.NewProductionConfig()
config.DisableStacktrace = true
config.EncoderConfig.TimeKey = "ts"
config.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder
return config.Build(opts...)
}
FROM golang:1.17.6
ENV GO111MODULE=on
ENV GOPRIVATE=code.vereign.com
RUN go install github.com/canthefason/go-watcher/cmd/watcher@v0.2.4
ADD . /go/src/code.vereign.com/gaiax/tsa/infohub
WORKDIR /go/src/code.vereign.com/gaiax/tsa/infohub/
RUN go install -mod=vendor ./cmd/infohub/...
EXPOSE 8080
ENTRYPOINT ["sh", "-c", "/go/bin/watcher -run code.vereign.com/gaiax/tsa/infohub/cmd/infohub -watch code.vereign.com/gaiax/tsa/infohub"]
go.mod 0 → 100644
module code.vereign.com/gaiax/tsa/infohub
go 1.17
require (
code.vereign.com/seal/backend/golib.git v0.0.1
github.com/go-redis/redis/v8 v8.4.2
github.com/google/uuid v1.1.2
github.com/gorilla/mux v1.8.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/penkovski/graceful v0.0.0-20220129102507-ba0dd223eb96
go.uber.org/zap v1.21.0
)
require (
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
go.opentelemetry.io/otel v0.14.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
go.sum 0 → 100644
This diff is collapsed.
package cache
import (
"context"
"errors"
"time"
"github.com/go-redis/redis/v8"
"go.uber.org/zap"
)
const queueName = "taskQueue"
var ErrNotFound = errors.New("value not found in cache")
type Cache struct {
logger *zap.Logger
redis *redis.Client
}
func New(addr, port, pass string, db int, logger *zap.Logger) *Cache {
rdb := redis.NewClient(&redis.Options{
Addr: addr + ":" + port,
Password: pass,
DB: db,
})
return &Cache{
logger: logger,
redis: rdb,
}
}
func (c *Cache) Set(ctx context.Context, key, value string, ttl time.Duration) error {
return c.redis.Set(ctx, key, value, ttl).Err()
}
func (c *Cache) Get(ctx context.Context, key string) (string, error) {
res, err := c.redis.Get(ctx, key).Result()
if err != nil {
if err == redis.Nil {
return "", ErrNotFound
}
return "", err
}
return res, nil
}
func (c *Cache) Enqueue(ctx context.Context, value string) error {
return c.redis.RPush(ctx, queueName, value).Err()
}
func (c *Cache) Dequeue(ctx context.Context) (string, error) {
res, err := c.redis.LPop(ctx, queueName).Result()
if err != nil {
return "", err
}
return res, nil
}
package config
import "time"
type Config struct {
HTTP httpConfig
Cache cache
}
type httpConfig struct {
Host string `envconfig:"HTTP_HOST"`
Port string `envconfig:"HTTP_PORT" default:"8080"`
IdleTimeout time.Duration `envconfig:"HTTP_IDLE_TIMEOUT" default:"120s"`
ReadTimeout time.Duration `envconfig:"HTTP_READ_TIMEOUT" default:"10s"`
WriteTimeout time.Duration `envconfig:"HTTP_WRITE_TIMEOUT" default:"10s"`
MaxRequestSize int `envconfig:"HTTP_MAX_REQUEST_SIZE" default:"10000"` // 10000 bytes default
}
type cache struct {
Host string `envconfig:"REDIS_HOST"`
Port string `envconfig:"REDIS_PORT" default:"6379"`
Pass string `envconfig:"REDIS_PASS"`
DB int `envconfig:"REDIS_DB" default:"0""`
}
package httpapi
import (
"context"
"encoding/json"
"net/http"
"github.com/gorilla/mux"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/infohub/internal/tasks"
"code.vereign.com/seal/backend/golib.git/errors"
)
type apiHandler struct {
logger *zap.Logger
tasks TasksController
}
type TasksController interface {
GetData(ctx context.Context, key string) (*tasks.GetDataResponse, error)
}
func New(tasks TasksController, logger *zap.Logger) http.Handler {
handler := &apiHandler{
logger: logger,
tasks: tasks,
}
router := mux.NewRouter()
router.Path("/alive").HandlerFunc(handler.Alive).Methods("GET")
router.Path("/ready").HandlerFunc(handler.Ready).Methods("GET")
router.Path("/v1/info").HandlerFunc(handler.GetInfo).Methods("POST")
return router
}
func (h *apiHandler) GetInfo(w http.ResponseWriter, r *http.Request) {
var req GetInfoRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
h.logger.Error("error decoding request", zap.Error(err))
errors.JSON(w, errors.New(errors.BadRequest, "invalid request"))
return
}
res, err := h.tasks.GetData(r.Context(), req.Data)
if err != nil {
h.logger.Error("error getting data from task controller", zap.Error(err))
errors.JSON(w, errors.New(errors.Internal, "failed to get data"))
return
}
response := &GetInfoResponse{
ID: res.ID,
Data: res.Data,
}
if err = json.NewEncoder(w).Encode(response); err != nil {
h.logger.Error("infohub get data: failed to send response", zap.Error(err))
}
}
func (h *apiHandler) Alive(w http.ResponseWriter, r *http.Request) {
h.encodeJSON(w, &struct {
Status string `json:"status"`
}{"up"})
}
func (h *apiHandler) Ready(w http.ResponseWriter, r *http.Request) {
h.encodeJSON(w, &struct {
Status string `json:"status"`
}{"up"})
}
func (h *apiHandler) encodeJSON(w http.ResponseWriter, resp interface{}) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}
package httpapi
type GetInfoRequest struct {
Type string `json:"type"`
Data string `json:"data"`
}
type GetInfoResponse struct {
ID string `json:"id"`
Data string `json:"data"`
}
package tasks
import (
"context"
"encoding/json"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"code.vereign.com/gaiax/tsa/infohub/internal/cache"
"code.vereign.com/seal/backend/golib.git/errors"
)
type Controller struct {
logger *zap.Logger
cache Cache
}
type Cache interface {
Set(ctx context.Context, key, value string, ttl time.Duration) error
Get(ctx context.Context, key string) (string, error)
Enqueue(ctx context.Context, value string) error
Dequeue(ctx context.Context) (string, error)
}
func New(cache Cache, logger *zap.Logger) *Controller {
return &Controller{
logger: logger,
cache: cache,
}
}
func (c *Controller) GetData(ctx context.Context, key string) (*GetDataResponse, error) {
data, err := c.cache.Get(ctx, key)
// key not found in cache
if err == cache.ErrNotFound {
task := createResolveTask(key)
bytes, err := json.Marshal(task)
if err != nil {
return nil, errors.New("failed to encode task", err)
}
err = c.cache.Enqueue(ctx, string(bytes))
if err != nil {
return nil, errors.New("failed to enqueue task", err)
}
return &GetDataResponse{ID: task.ID}, nil
}
if err != nil {
return nil, errors.New("failed get value form cache", err)
}
// key found in cache
return &GetDataResponse{Data: data}, nil
}
func createResolveTask(key string) *Task {
return &Task{
ID: uuid.New().String(),
Url: "uni-resolver-web:8080",
Body: key,
Method: "GET",
}
}
package tasks
type Task struct {
ID string
Url string
Body string
Method string
}
package tasks
type GetDataResponse struct {
ID string
Data string
}
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment