Newer
Older
package server
/*
Copyright (c) 2018 Vereign AG [https://www.vereign.com]
This is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"code.vereign.com/code/key-storage-agent/config"
"code.vereign.com/code/key-storage-agent/handler"
"code.vereign.com/code/key-storage-agent/session"
"code.vereign.com/code/key-storage-agent/utils"
"code.vereign.com/code/viam-apis/authentication"
api "code.vereign.com/code/viam-apis/key-storage-agent/api"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
var mutex sync.RWMutex
var summaries = make(map[string]prometheus.Summary)
// private type for Context keys
type contextKey int
const (
clientIDKey contextKey = iota
)
var pkgCertPEM []byte
var pkgKeyPEM []byte
var pkgCaCertPEM []byte
func credMatcher(headerName string) (mdName string, ok bool) {
if headerName == "Session" {
return headerName, true
}
return "", false
}
// authenticateAgent check the client credentials
func authenticateClient(ctx context.Context, s *handler.KeyStorageServerImpl, invokedMethod string) (string, error) {
if md, ok := metadata.FromIncomingContext(ctx); ok {
clientAuth := &authentication.Authentication{
Uuid: strings.Join(md["uuid"], ""),
Session: strings.Join(md["session"], ""),
}
viamAuth := &authentication.Authentication{
Uuid: config.SystemAuth.Uuid,
Session: config.SystemAuth.Session,
sessionClient := utils.CreateDataStorageClient(viamAuth)
defer sessionClient.CloseClient()
if clientAuth.Uuid == viamAuth.Uuid {
if clientAuth.Session != viamAuth.Session {
return "", fmt.Errorf("bad session %s", clientAuth.Session)
}
} else {
if session.CheckSession(clientAuth.Uuid, clientAuth.Session, sessionClient) == false {
return "", fmt.Errorf("bad session %s", clientAuth.Session)
}
}
return clientAuth.Uuid, nil
}
return "", fmt.Errorf("missing credentials")
}
// unaryInterceptor call authenticateClient with current context
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler1 grpc.UnaryHandler) (interface{}, error) {
s, ok := info.Server.(*handler.KeyStorageServerImpl)
if !ok {
return nil, fmt.Errorf("unable to cast server")
}
clientID, err := authenticateClient(ctx, s, info.FullMethod)
if err != nil {
return nil, err
}
ctx = context.WithValue(ctx, clientIDKey, clientID)
res, err := handler1(ctx, req)
if err != nil {
errors.Log(err)
}
return res, err
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
func createQueryTime(funcName string) prometheus.Summary {
if config.PrometeusListenAddress != "" {
endPointName := strings.Replace(funcName, "/", "_", -1)
endPointName = strings.Replace(endPointName, ".", "_", -1)
metricName := endPointName + "_grpc_request_duration_seconds"
if config.MetricEnvPrefix != "" {
metricName = config.MetricEnvPrefix + "_" + metricName
}
mutex.Lock()
defer mutex.Unlock()
queryTime, ok := summaries[metricName]
if ok == false {
queryTime = prometheus.NewSummary(prometheus.SummaryOpts{
Name: metricName,
Help: "grpc request duration seconds of /" + funcName + " for " + config.MetricEnvPrefix + " env",
})
// init metrics
prometheus.MustRegister(queryTime)
summaries[metricName] = queryTime
}
return queryTime
}
return nil
}
func StartGRPCServer(address string, certPEM, privateKeyPEM, caCertPEM, vereignCertPEM, vereignPrivateKeyPEM []byte, dataStorageAddress string, maxMessageSize int) error {
// create a listener on TCP port
lis, err := net.Listen("tcp", address)
if err != nil {
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
pkgCertPEM = certPEM
pkgKeyPEM = privateKeyPEM
pkgCaCertPEM = caCertPEM
opts := []grpc.ServerOption{}
opts = append(opts, grpc.UnaryInterceptor(unaryInterceptor),
grpc.MaxRecvMsgSize(config.MaxMessageSize*1024*1024))
if config.UseTLS {
// Load the certificates from PEM Strings
certificate, err := tls.X509KeyPair(certPEM, privateKeyPEM)
if err != nil {
log.Printf("Error: %v", err)
return fmt.Errorf("could not load server key pair: %s", err)
}
// Create a certificate pool from the certificate authority
// Get the SystemCertPool, continue with an empty pool on error
certPool, _ := x509.SystemCertPool()
if certPool == nil {
certPool = x509.NewCertPool()
}
if ok := certPool.AppendCertsFromPEM(caCertPEM); !ok {
return fmt.Errorf("failed to append server certs")
}
// Create the TLS credentials
creds := credentials.NewTLS(&tls.Config{
//ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{certificate},
ClientCAs: certPool,
})
// Create an array of gRPC options with the credentials
opts = append(opts, grpc.Creds(creds))
}
// create a server instance
s := handler.KeyStorageServerImpl{
DataStorageUrl: dataStorageAddress,
CertPEM: certPEM,
KeyPEM: privateKeyPEM,
CaCertPEM: caCertPEM,
VereignCertPEM: vereignCertPEM,
VereignPrivateKeyPEM: vereignPrivateKeyPEM,
MaxMessageSize: maxMessageSize,
}
// create a gRPC server object
grpcServer := grpc.NewServer(opts...)
// attach the CalcMinimumDistance service to the server
api.RegisterKeyStorageServer(grpcServer, &s)
// start the server
log.Printf("starting HTTP/2 gRPC server on %s", address)
if err := grpcServer.Serve(lis); err != nil {
return fmt.Errorf("failed to serve: %s", err)
}
return nil
}
func StartRESTServer(address, grpcAddress string, certPEM []byte) error {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mux := runtime.NewServeMux(runtime.WithIncomingHeaderMatcher(credMatcher))
certPool, err := x509.SystemCertPool()
if certPool == nil {
certPool = x509.NewCertPool()
// Append the client certificates from the CA
if ok := certPool.AppendCertsFromPEM(certPEM); !ok {
return fmt.Errorf("failed to append client certs")
}
creds := credentials.NewClientTLSFromCert(certPool, "")
// Setup the client gRPC options
opts := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
// Register RedisStorageServer
err = api.RegisterKeyStorageHandlerFromEndpoint(ctx, mux, grpcAddress, opts)
if err != nil {
return fmt.Errorf("could not register service RedisStorageServer: %s", err)
}
log.Printf("starting HTTP/1.1 REST server on %s", address)
http.ListenAndServe(address, mux)
return nil
}
func StartPrometheusServer() error {
if config.PrometeusListenAddress != "" {
// start prometheus
promHandler := http.NewServeMux()
promHandler.Handle("/metrics", promhttp.Handler())
log.Println("Starting prometheus...")
err := http.ListenAndServe(config.PrometeusListenAddress, promHandler)
if err != nil {
return err
}
}
return nil
}