Skip to content
Snippets Groups Projects
Commit 2daa1e46 authored by Gospodin Bodurov's avatar Gospodin Bodurov
Browse files

Merge branch '18-http-client' into 'master'

[18] grpc server and Rest API http client architecture

See merge request !92
parents c9fc8a3f 960a1e53
No related branches found
Tags 1.5
1 merge request!92[18] grpc server and Rest API http client architecture
Showing
with 3982 additions and 1 deletion
......@@ -6,4 +6,11 @@ vendor/
temp/
yarn-error.log
/.project
cmake-build-debug/
/cpp/cmake-build*
/cpp/cmake-install*
/cpp/compile_commands.json
/cpp/.clangd
/cpp/tags
/cpp/docs/doxy
[submodule "cpp/proto"]
path = cpp/proto
url = git@code.vereign.com:code/vcl-proto.git
cmake-*
compile_commands.json
.clangd
cmake_minimum_required (VERSION 3.16.5)
if(WIN32)
set(CMAKE_IGNORE_PATH "C:/Strawberry/c/bin")
endif()
project (vereign)
if (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS "19.0.24215.1")
message(FATAL_ERROR "Microsoft Visual C++ version MSVC 19.0.24215.1 required")
endif()
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS "9.0")
message(FATAL_ERROR "Insufficient clang version - clang 9.0+ required")
endif()
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS "7.5")
message(FATAL_ERROR "Insufficient GNU compiler version - 7.5+ required")
endif()
else()
message(WARNING "Unknown compiler...")
endif()
if (CMAKE_CXX_COMPILER_ID MATCHES "MSVC")
add_definitions(-D_WIN32_WINNT=0x0601)
set(CMAKE_MSVC_RUNTIME_LIBRARY "MultiThreaded$<$<CONFIG:Release>:Release>")
set(CMAKE_MSVC_RUNTIME_LIBRARY "MultiThreaded$<$<CONFIG:Debug>:Debug>")
set(CMAKE_C_FLAGS "/DNDEBUG /DWIN32 /D_WINDOWS /W3")
set(CMAKE_CXX_FLAGS "/DNDEBUG /DWIN32 /D_WINDOWS /W3 /GR /EHsc")
set(CMAKE_C_FLAGS_DEBUG "/MTd /Zi /Ob0 /Od /RTC1")
set(CMAKE_CXX_FLAGS_DEBUG "/MTd /Zi /Ob0 /Od /RTC1")
set(CMAKE_CXX_FLAGS_RELEASE "/Gd /MT /O2 /Oi /Ot /Gy /Zi /GL")
set(CMAKE_C_FLAGS_RELEASE "/Gd /MT /O2 /Oi /Ot /Gy /Zi /GL")
if (CMAKE_BUILD_TYPE STREQUAL "Release")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /DEBUG /OPT:REF /OPT:ICF /LTCG")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} /DEBUG /OPT:REF /OPT:ICF /LTCG")
set(CMAKE_STATIC_LINKER_FLAGS "${CMAKE_STATIC_LINKER_FLAGS} /LTCG")
endif()
endif()
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_FLAGS_RELEASE "-g -O3 -Wall -Wextra -pedantic")
set(CMAKE_CXX_FLAGS_DEBUG "-g -O0 -Wall -Wextra -pedantic")
endif()
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CMAKE_CXX_FLAGS_RELEASE "-g -O3 -Wall -Wextra -pedantic")
set(CMAKE_CXX_FLAGS_DEBUG "-g -O0 -Wall -Wextra -pedantic")
endif()
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_EXTENSIONS OFF)
message("Generator: " "${CMAKE_GENERATOR}")
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/${CMAKE_BUILD_TYPE}")
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake")
message("Build type: " "${CMAKE_BUILD_TYPE}")
message("CXX Flags: " "${CMAKE_CXX_FLAGS}")
message("CXX debug flags: " "${CMAKE_CXX_FLAGS_DEBUG}")
message("CXX release flags: " "${CMAKE_CXX_FLAGS_RELEASE}")
message("CXX linker flags: " "${CMAKE_EXE_LINKER_FLAGS}")
message("CXX dll linker flags: " "${CMAKE_SHARED_LINKER_FLAGS}")
message("CXX static linker flags: " "${CMAKE_STATIC_LINKER_FLAGS}")
string(TOLOWER "${CMAKE_BUILD_TYPE}" _build_type)
set(VENDOR_INSTALL_DIR ${CMAKE_SOURCE_DIR}/cmake-install-vendor-${_build_type} CACHE STRING "vendor directory")
message(STATUS "Using vendor install dir: ${VENDOR_INSTALL_DIR}")
#set(VENDOR_INSTALL_DIR /home/daniel/workspace/local)
set(_cmake_prefix_paths
${VENDOR_INSTALL_DIR}
${VENDOR_INSTALL_DIR}/grpc
)
set(CMAKE_PREFIX_PATH ${_cmake_prefix_paths} CACHE STRING "")
include(CMakeToolsHelpers OPTIONAL)
include(Helpers)
include(ProtoGenerate)
enable_testing()
find_package(fmt 6.2.0 REQUIRED)
set(OPENSSL_USE_STATIC_LIBS ON)
set(OPENSSL_ROOT_DIR ${VENDOR_INSTALL_DIR}/boringssl)
find_package(OpenSSL)
set(Boost_USE_STATIC_LIBS ON)
find_package(
Boost
1.72.0
EXACT
REQUIRED
COMPONENTS regex thread system
)
find_package(Protobuf CONFIG REQUIRED)
message(STATUS "Using protobuf ${protobuf_VERSION}")
if(CMAKE_CROSSCOMPILING)
find_program(PROTOBUF_PROTOC protoc)
else()
set(PROTOBUF_PROTOC $<TARGET_FILE:protobuf::protoc>)
endif()
# Find gRPC installation
# Looks for gRPCConfig.cmake file installed by gRPC's cmake installation.
find_package(gRPC CONFIG REQUIRED)
message(STATUS "Using gRPC ${gRPC_VERSION}")
if(CMAKE_CROSSCOMPILING)
find_program(GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin)
else()
set(GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:gRPC::grpc_cpp_plugin>)
endif()
add_subdirectory("src")
add_subdirectory("tests")
get_target_property(_protobuflib_location protobuf::libprotobuf LOCATION)
get_target_property(_grpclib_location gRPC::grpc++ LOCATION)
get_target_property(_grpclib_reflection_location gRPC::grpc++_reflection LOCATION)
set(
_grpc_libs
${_protobuflib_location}
${_grpclib_location}
${_grpclib_reflection_location}
)
string(TOUPPER "${CMAKE_BUILD_TYPE}" _build_type)
message(STATUS "summary of build options:
Package version: ${VERSION}
Library version: ${LT_CURRENT}:${LT_REVISION}:${LT_AGE}
Install prefix: ${CMAKE_INSTALL_PREFIX}
Target system: ${CMAKE_SYSTEM_NAME}
Compiler:
Build type: ${CMAKE_BUILD_TYPE}
C compiler: ${CMAKE_C_COMPILER}
CFLAGS: ${CMAKE_C_FLAGS_${_build_type}} ${CMAKE_C_FLAGS}
C++ compiler: ${CMAKE_CXX_COMPILER}
CXXFLAGS: ${CMAKE_CXX_FLAGS_${_build_type}} ${CMAKE_CXX_FLAGS}
WARNCFLAGS: ${WARNCFLAGS}
CXX1XCXXFLAGS: ${CXX1XCXXFLAGS}
Libs:
fmt: ${fmt_FOUND} [${fmt_VERSION}] (DIR='${fmt_DIR}')
OpenSSL: ${OpenSSL_FOUND} [${OPENSSL_VERSION}] (LIBS='${OPENSSL_LIBRARIES}')
Zlib: ${ZLIB_FOUND} [${ZLIB_VERSION_STRING}] (LIBS='${ZLIB_LIBRARIES}')
Boost ${Boost_FOUND} [${Boost_VERSION_STRING}] (DIR='${Boost_DIR}')
Boost libs ${Boost_LIBRARIES}
gRPC ${gRPC_FOUND} [${gRPC_VERSION}] (LIBS='${_grpc_libs}')
")
function(dump_vars)
get_cmake_property(_variableNames VARIABLES)
list (SORT _variableNames)
foreach (_variableName ${_variableNames})
message(STATUS "${_variableName}=${${_variableName}}")
endforeach()
endfunction()
function(target_proto_generate)
cmake_parse_arguments(
PROTO
""
"TARGET;GEN_DIR;SRC_DIR"
"DEFINITIONS"
${ARGN}
)
message(STATUS "proto target: ${PROTO_TARGET}")
message(STATUS " proto output dir: ${PROTO_GEN_DIR}")
message(STATUS " proto src dir: ${PROTO_SRC_DIR}")
# FIXME: remove hardcoded googleapis include below
foreach (proto ${PROTO_DEFINITIONS})
get_filename_component(_proto ${proto} ABSOLUTE)
get_filename_component(_proto_path "${_proto}" PATH)
get_filename_component(_proto_name ${proto} NAME_WLE)
string(REPLACE ${PROTO_SRC_DIR} ${PROTO_GEN_DIR} _proto_gen_path ${_proto_path})
message(STATUS " proto generate [${_proto_name}] from ${_proto} to ${_proto_gen_path}")
# Generated sources
set(_proto_src "${_proto_gen_path}/${_proto_name}.pb.cc")
set(_proto_hdr "${_proto_gen_path}/${_proto_name}.pb.h")
add_custom_command(
OUTPUT "${_proto_src}" "${_proto_hdr}"
COMMAND ${PROTOBUF_PROTOC}
ARGS
--cpp_out "${PROTO_GEN_DIR}"
--proto_path "${PROTO_SRC_DIR}"
-I "${_proto_path}"
-I "${CMAKE_SOURCE_DIR}/src/vereign/proto/googleapis"
"${_proto}"
DEPENDS "${_proto}")
target_sources(${PROTO_TARGET} PRIVATE ${_proto_src})
endforeach()
endfunction()
function(target_grpc_generate)
cmake_parse_arguments(
PROTO
""
"TARGET;GEN_DIR;SRC_DIR"
"DEFINITIONS"
${ARGN}
)
message(STATUS "grpc target: ${PROTO_TARGET}")
message(STATUS " grpc output dir: ${PROTO_GEN_DIR}")
message(STATUS " grpc src dir: ${PROTO_SRC_DIR}")
# FIXME: remove hardcoded googleapis include below
foreach (proto ${PROTO_DEFINITIONS})
get_filename_component(_proto ${proto} ABSOLUTE)
get_filename_component(_proto_path "${_proto}" PATH)
get_filename_component(_proto_name ${proto} NAME_WLE)
string(REPLACE ${PROTO_SRC_DIR} ${PROTO_GEN_DIR} _proto_gen_path ${_proto_path})
message(STATUS " grpc generate [${_proto_name}] from ${_proto} to ${_proto_gen_path}")
# Generated sources
set(_proto_src "${_proto_gen_path}/${_proto_name}.grpc.pb.cc")
set(_proto_hdr "${_proto_gen_path}/${_proto_name}.grpc.pb.h")
add_custom_command(
OUTPUT "${_proto_src}" "${_proto_hdr}"
COMMAND ${PROTOBUF_PROTOC}
ARGS
--grpc_out "${PROTO_GEN_DIR}"
--cpp_out "${PROTO_GEN_DIR}"
--proto_path "${PROTO_SRC_DIR}"
-I "${_proto_path}"
-I "${CMAKE_SOURCE_DIR}/src/vereign/proto/googleapis"
--plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN_EXECUTABLE}"
"${_proto}"
DEPENDS "${_proto}")
target_sources(${PROTO_TARGET} PRIVATE ${_proto_src})
endforeach()
endfunction()
# fun:*main*
# fun:*foo*
# src:*csandbox.cc
fun:*registerReporter*
fun:_ZN5Catch16ReporterRegistry16registerReporterERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt10shared_ptrINS_16IReporterFactoryEE
fun:_ZN5Catch12_GLOBAL__N_111RegistryHub16registerReporterERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt10shared_ptrINS_16IReporterFactoryEE
fun:_ZThn8_N5Catch12_GLOBAL__N_111RegistryHub16registerReporterERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt10shared_ptrINS_16IReporterFactoryEE
# src:../tests/init_tests.cc
This diff is collapsed.
Subproject commit 910d2c372559e31a0f0df6a6e7048e4256d02028
if (fmt_FOUND)
get_target_property(FMT_INCLUDE_DIR fmt::fmt INTERFACE_INCLUDE_DIRECTORIES)
endif()
include_directories(
${CMAKE_CURRENT_BINARY_DIR}
${CMAKE_SOURCE_DIR}/src
${VENDOR_INSTALL_DIR}/include
${VENDOR_INSTALL_DIR}/boost/include
${FMT_INCLUDE_DIR}
${CMAKE_SOURCE_DIR}/proto/cpp
)
file(GLOB PROTO_SRC ${CMAKE_SOURCE_DIR}/proto/cpp/vereign/client_library/*.cc)
list(APPEND PROTO_SRC
${CMAKE_SOURCE_DIR}/proto/cpp/google/api/annotations.pb.cc
${CMAKE_SOURCE_DIR}/proto/cpp/google/api/http.pb.cc
${CMAKE_SOURCE_DIR}/proto/cpp/code.vereign.com/code/viam-apis/entities-management-agent/api/api.pb.cc
${CMAKE_SOURCE_DIR}/proto/cpp/code.vereign.com/code/viam-apis/versions/api/api.pb.cc
)
add_library(vereignproto STATIC ${PROTO_SRC})
target_link_libraries(
vereignproto
fmt::fmt
protobuf::libprotobuf
OpenSSL::SSL
OpenSSL::Crypto
$<$<CXX_COMPILER_ID:MSVC>:CRYPT32.LIB>
)
set(vereignlib_src
vereign/restapi/detail/http_reader.cc
vereign/restapi/client.cc
vereign/service/gen/passport_service.cc
vereign/grpc/server.cc
vereign/service/passport_service.cc
)
add_library(vereignlib STATIC ${vereignlib_src})
target_link_libraries(
vereignlib
vereignproto
gRPC::grpc++_reflection
gRPC::grpc++
)
set(csandbox_sources
csandbox.cc
)
#add_library(csandboxlib STATIC ${csandboxlib_src})
#target_link_libraries(csandboxlib ${LIBS})
add_executable(csandbox ${csandbox_sources})
target_link_libraries(csandbox
fmt::fmt
Boost::regex
Threads::Threads
OpenSSL::SSL
$<$<CXX_COMPILER_ID:MSVC>:CRYPT32.LIB>
)
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <iostream>
#include <type_traits>
#include <vector>
#include <boost/bind.hpp>
#include <boost/optional.hpp>
namespace experiment {
/**
* Array with c++11 move semantics for research purposes.
*
* @note Do not use this in real code.
*/
class Array {
public:
Array() : arr_{nullptr}, size_{0} {
std::cout << "default construct " << to_s() << "\n";
}
Array(std::initializer_list<int> l)
: arr_(new int[l.size()]{})
, size_(l.size()) {
std::copy(l.begin(), l.end(), arr_);
std::cout << "create list " << to_s() << "\n";
}
Array(const std::vector<int>& src) {
arr_ = new int[src.size()]{};
size_ = src.size();
std::copy(src.begin(), src.end(), arr_);
std::cout << "create " << to_s() << "\n";
}
Array(const Array& other) : arr_(new int[other.size_]{}), size_{other.size_} {
std::copy(other.arr_, other.arr_ + other.size_, arr_);
std::cout << "copy construct " << to_s() << "\n";
}
Array& operator=(const Array& other) {
Array tmp{other};
swap(*this, tmp);
std::cout << "copy assign " << to_s() << "\n";
return *this;
}
Array(Array&& other) noexcept : arr_{other.arr_}, size_(other.size_) {
other.arr_ = nullptr;
other.size_ = 0;
std::cout << "move construct " << to_s() << "\n";
}
Array& operator=(Array&& other) noexcept {
swap(*this, other);
std::cout << "move assign " << to_s() << "\n";
return *this;
}
~Array() {
std::cout << "destroy " << to_s() << "\n";
if (arr_) {
delete [] arr_;
}
}
void reset() {
if (arr_) {
delete [] arr_;
arr_ = nullptr;
}
size_ = 0;
}
std::string to_s() const {
std::string ret = std::to_string(intptr_t(this)) + " ";
if (!arr_) {
return ret;
}
for (int i = 0; i < size_; i++) {
ret += std::to_string(arr_[i]) + " ";
}
return ret;
}
friend void swap(Array& lhs, Array& rhs) noexcept;
private:
int* arr_;
int size_;
};
void swap(Array& lhs, Array& rhs) noexcept {
using std::swap;
swap(lhs.arr_, rhs.arr_);
swap(lhs.size_, rhs.size_);
}
}
using namespace experiment;
int main(int , char* []) {
}
#ifndef __VEREIGN_CORE_RVREF_HH
#define __VEREIGN_CORE_RVREF_HH
namespace vereign {
namespace core {
#ifdef __WINDOWS__
#pragma warning(push)
#pragma warning(disable: 4521)
#endif
/**
* Rvalue reference wrapper.
*
* Wraps rvalue reference in copyable object.
* Thus allowing lambda to capture rvalue references.
*
* @code
* std::string foo = "foo";
* auto foo_ref MakeRvRef(foo);
* auto lambda = [foo_ref] () mutable {
* std::cout << foo_ref.get() << std::endl;
* };
* lambda();
* @endcode
*
*/
template <typename T>
class RvRef {
public:
RvRef() = delete;
RvRef& operator=(RvRef& other) = delete;
RvRef(T&& value) : value_{std::move(value)} {}
RvRef(RvRef& other) : value_{std::move(other.value_)} {}
RvRef(RvRef&& other) : value_{std::move(other.value_)} {}
/**
* Retrieve reference to the stored value.
*/
const T& Get() const noexcept {
return value_;
}
/**
* Retrieve reference to the stored value.
*/
T& Get() noexcept {
return value_;
}
/**
* Retrieve reference to the stored value.
*/
const T* operator->() const noexcept {
return &value_;
}
/**
* Retrieve reference to the stored value.
*/
T* operator->() noexcept {
return &value_;
}
private:
T value_;
};
#ifdef __WINDOWS__
#pragma warning(pop)
#endif
template <typename T>
RvRef<T> MakeRvRef(T&& value) {
return RvRef<T>(std::move(value));
}
}
}
#endif
#ifndef __VEREIGN_CORE_SCOPE_GUARD_HH
#define __VEREIGN_CORE_SCOPE_GUARD_HH
namespace vereign {
namespace core {
template<typename Func>
class ScopeGuard {
public:
static_assert(
std::is_nothrow_constructible<Func, Func&&>::value,
"func must be nothrow_constructible"
);
ScopeGuard(Func&& func) : func_{std::move(func)} {}
~ScopeGuard() noexcept {
func_();
}
private:
Func func_;
};
template<typename Func>
ScopeGuard<Func> MakeScopeGuard(Func&& func) {
return ScopeGuard<Func>{std::move(func)};
}
}
}
#endif
\ No newline at end of file
#ifndef __VEREIGN_GRPC_GEN_GEN_HH
#define __VEREIGN_GRPC_GEN_GEN_HH
#include <vereign/grpc/gen/passport_api.hh>
#endif
#ifndef __VEREIGN_GRPC_GEN_PASSPORT_API_HH
#define __VEREIGN_GRPC_GEN_PASSPORT_API_HH
#include <vereign/client_library/passport_api.gen.grpc.pb.h>
namespace vereign {
namespace grpc {
namespace gen {
template <class VereignService>
class PassportAPI : public client_library::PassportAPI::Service {
public:
using VereignServicePtr = std::unique_ptr<VereignService>;
PassportAPI(VereignServicePtr&& service)
: service_{std::move(service)}
{}
PassportAPI(const PassportAPI&) = delete;
PassportAPI& operator=(const PassportAPI&) = delete;
::grpc::Status ListPassports(
::grpc::ServerContext*,
const client_library::ListPassportsForm* request,
client_library::ListPassportsFormResponse* response
) override {
auto result_future = service_->ListPassports(request, response);
try {
result_future.get();
} catch (const std::exception& e) {
response->set_code("500");
response->set_status("Internal Service Error");
response->set_error(e.what());
} catch (...) {
response->set_code("500");
response->set_status("Internal Service Error");
response->set_error("Internal Service Error");
}
return ::grpc::Status::OK;
}
protected:
VereignServicePtr service_;
};
} // gen
} // namespace grpc
} // namespace vereign
#endif // __VEREIGN_GRPC_GEN_PASSPORT_API_HH
#ifndef __VEREIGN_GRPC_PASSPORT_API_HH
#define __VEREIGN_GRPC_PASSPORT_API_HH
#include <vereign/grpc/gen/passport_api.hh>
namespace vereign {
namespace grpc {
template <class VereignService>
class PassportAPI final : public gen::PassportAPI<VereignService> {
public:
using VereignServicePtr = std::unique_ptr<VereignService>;
PassportAPI(VereignServicePtr&& service)
: gen::PassportAPI<VereignService>{std::move(service)}
{}
PassportAPI(const PassportAPI&) = delete;
PassportAPI& operator=(const PassportAPI&) = delete;
::grpc::Status ListPassportsManually(
::grpc::ServerContext*,
const client_library::ListPassportsForm* request,
client_library::ListPassportsFormResponse* response
) override {
auto result_future = this->service_->ListPassports(request, response);
try {
result_future.get();
} catch (const std::exception& e) {
response->set_code("500");
response->set_status("Internal Service Error");
response->set_error(e.what());
} catch (...) {
response->set_code("500");
response->set_status("Internal Service Error");
response->set_error("Internal Service Error");
}
return ::grpc::Status::OK;
}
};
} // namespace grpc
} // namespace vereign
#endif // __VEREIGN_GRPC_PASSPORT_API_HH
#include "boost/asio/executor_work_guard.hpp"
#include "vereign/grpc/gen/passport_api.hh"
#include "vereign/service/gen/passport_service.hh"
#include <vereign/grpc/server.hh>
#include <vereign/restapi/client.hh>
#include <vereign/restapi/client_session.hh>
#include <vereign/service/gen/gen.hh>
#include <vereign/service/passport_service.hh>
#include <vereign/grpc/gen/gen.hh>
#include <vereign/grpc/passport_api.hh>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include <grpcpp/security/server_credentials.h>
namespace vereign {
namespace grpc {
Server::Server(
const std::string& listenAddress,
const std::string& vereignHost,
const std::string& vereignPort,
// FIXME: the public key must come from a storage
const std::string& publicKey
)
: work_guard_{asio::make_work_guard(ioc_)},
ssl_context_{asio::ssl::context::tlsv12_client},
client_{std::make_unique<restapi::Client>(
ioc_, ssl_context_, vereignHost, vereignPort
)},
client_session_{std::make_unique<restapi::ClientSession>(
*client_, publicKey
)}
{
// FIXME: Verify the remote server's certificate
// ssl_context.set_verify_mode(ssl::verify_peer);
::grpc::ServerBuilder builder;
builder.AddListeningPort(
listenAddress,
::grpc::InsecureServerCredentials(),
&selected_port_
);
services_.emplace_back(
std::make_unique<grpc::PassportAPI<service::PassportService>>(
std::make_unique<service::PassportService>(*client_session_)
)
);
for (auto& service : services_) {
builder.RegisterService(service.get());
}
server_ = builder.BuildAndStart();
if (server_ == nullptr) {
throw std::runtime_error("server start failed");
}
server_thread_ = std::thread([this]() {
server_->Wait();
});
service_thread_ = std::thread([this]() {
ioc_.run();
});
}
Server::~Server() {
Shutdown();
}
void Server::Shutdown() {
client_session_->Close();
server_->Shutdown();
if (server_thread_.joinable()) {
server_thread_.join();
}
work_guard_.reset();
if (service_thread_.joinable()) {
service_thread_.join();
}
}
int Server::SelectedPort() const {
return selected_port_;
}
}
}
#ifndef __VEREIGN_GRPC_SERVER_API_HH
#define __VEREIGN_GRPC_SERVER_API_HH
#include <thread>
#include <grpcpp/server.h>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/asio/executor_work_guard.hpp>
namespace vereign {
namespace restapi {
class Client;
class ClientSession;
}
namespace grpc {
namespace asio = boost::asio;
/**
* Server is a grpc server that provides the Vereign services.
*
* It bootstraps the grpc server, http client, http client session and
* Vereign services.
*/
class Server {
public:
/**
* Constructs and bootstraps the server.
*
* @param listenAddress gRPC listen address, for example "localhost:".
* @param vereignHost Vereign restapi host.
* @param vereignPort Vereign restapi port - https, 443...
*/
explicit Server(
const std::string& listenAddress,
const std::string& vereignHost,
const std::string& vereignPort,
// FIXME: the public key must come from a storage
const std::string& publicKey
);
/**
* Shutdowns the server.
*
* @see Server::Shutdown
*/
~Server();
// Disable copying
Server(const Server&) = delete;
Server& operator=(const Server&) = delete;
/**
* Shutdown the server.
*
* It will cancel all pending http requests to the Vereign restapi.
* Blocks for all pending gRPC handlers to finish.
*/
void Shutdown();
/**
* Returns the port that the gRPC server listens to.
*
* This is useful if you construct the server with ephemeral port.
* Then this method will return the port that the OS assigned to the gRPC
* socket.
*/
int SelectedPort() const;
private:
int selected_port_;
asio::io_context ioc_;
asio::executor_work_guard<asio::io_context::executor_type> work_guard_;
asio::ssl::context ssl_context_;
std::unique_ptr<vereign::restapi::Client> client_;
std::unique_ptr<vereign::restapi::ClientSession> client_session_;
std::vector<std::unique_ptr<::grpc::Service>> services_;
std::unique_ptr<::grpc::Server> server_;
std::thread server_thread_;
std::thread service_thread_;
};
}
}
#endif
#include "vereign/restapi/detail/post_task.hh"
#include <chrono>
#include <vereign/restapi/client.hh>
#include <vereign/restapi/detail/http_reader.hh>
#include <boost/asio/strand.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/dispatch.hpp>
namespace vereign {
namespace restapi {
constexpr auto defaultTimeout = std::chrono::seconds(30);
Client::Client(
boost::asio::io_context& ioc,
boost::asio::ssl::context& ssl_ctx,
const std::string& host,
const std::string& port
)
: executor_{asio::make_strand(ioc)},
ssl_ctx_{ssl_ctx},
resolver_{ioc},
reader_{nullptr},
host_{host},
port_{port},
expiry_time_{defaultTimeout},
connecting_{false},
writing_{false},
reading_{false},
closed_{false}
{
}
Client::Client(
boost::asio::io_context& ioc,
boost::asio::ssl::context& ssl_ctx,
const std::string& host,
const std::string& port,
std::chrono::nanoseconds expiry_time
)
: executor_{asio::make_strand(ioc)},
ssl_ctx_{ssl_ctx},
resolver_{ioc},
reader_{nullptr},
host_{host},
port_{port},
expiry_time_{expiry_time},
connecting_{false},
writing_{false},
reading_{false},
closed_{false}
{
}
Client::~Client() noexcept {
Close();
}
void Client::Close() {
asio::post(
executor_,
[this]() {
if (closed_) {
return;
}
closed_ = true;
if (stream_) {
beast::get_lowest_layer(*stream_).close();
stream_.reset();
}
}
);
}
const asio::executor& Client::GetExecutor() const {
return executor_;
}
void Client::addPostTask(detail::PostTaskBasePtr&& task) {
asio::post(
executor_,
[this, task = std::move(task)]() mutable {
if (closed_) {
task->Complete(detail::PostError{"client closed"});
return;
}
task_queue_.push_back(std::move(task));
if (task_queue_.size() == 1) {
doPost();
}
});
}
void Client::completeTask(
const boost::optional<detail::HttpResponse>& resp,
const detail::PostError& err
) {
if (task_queue_.empty()) {
return;
}
auto task = std::move(task_queue_.front());
task_queue_.pop_front();
if (!err) {
task->DecodeResponse(*resp);
}
asio::dispatch(executor_, [task = std::move(task), err]() mutable {
task->Complete(err);
});
}
void Client::handleError(const detail::PostError& err) {
completeTask(boost::none, err);
resetStream();
if (task_queue_.size() > 0) {
asio::post(
executor_,
[this]() {
doPost();
});
}
}
void Client::resolve() {
resolver_.async_resolve(
host_,
port_,
[this](beast::error_code ec, tcp::resolver::results_type results) {
if (ec) {
connecting_ = false;
handleError(ec.message());
return;
}
connect(results);
});
}
void Client::connect(tcp::resolver::results_type results) {
stream_ = std::make_shared<streamType>(executor_, ssl_ctx_);
reader_ = std::make_shared<detail::HttpReader>(stream_, expiry_time_);
using endpoint_t = tcp::resolver::results_type::endpoint_type;
beast::get_lowest_layer(*stream_).expires_after(expiry_time_);
auto stream = stream_;
beast::get_lowest_layer(*stream_).async_connect(
results,
[this, stream](beast::error_code ec, endpoint_t) {
if (ec) {
connecting_ = false;
handleError(ec.message());
return;
}
handshake();
}
);
}
void Client::handshake() {
auto stream = stream_;
stream_->async_handshake(
asio::ssl::stream_base::client,
[this, stream](beast::error_code ec) {
connecting_ = false;
if (ec) {
handleError(ec.message());
return;
}
doPost();
}
);
}
void Client::cancelAllTasks(const detail::PostError& err) {
while (task_queue_.size() > 0) {
auto task = std::move(task_queue_.front());
task_queue_.pop_front();
task->Complete(err);
}
}
void Client::doPost() {
if (writing_ || connecting_ || task_queue_.size() == 0) {
return;
}
if (closed_) {
cancelAllTasks(detail::PostError("client closed"));
return;
}
if (!stream_ || reader_->IsClosed()) {
connecting_ = true;
resolve();
return;
}
auto& task = task_queue_.front();
req_.emplace();
req_->method(beast::http::verb::post);
req_->version(11);
req_->target(task->Path());
req_->set(beast::http::field::host, host_);
req_->set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING);
req_->set(beast::http::field::content_type, "application/json");
auto err = task->EncodeRequest(*req_);
if (err) {
handleError(err);
return;
}
req_->prepare_payload();
auto stream = stream_;
writing_ = true;
beast::get_lowest_layer(*stream_).expires_after(expiry_time_);
beast::http::async_write(
*stream_,
*req_,
[this, stream](beast::error_code ec, std::size_t bytes_transferred) {
writing_ = false;
boost::ignore_unused(bytes_transferred);
if (ec) {
handleError(ec.message());
return;
}
readResponse();
});
}
void Client::resetStream() {
if (!stream_) {
return;
}
beast::get_lowest_layer(*stream_).close();
stream_.reset();
}
void Client::readResponse() {
if (reading_) {
return;
}
reader_->AsyncRead(
[this](const detail::HttpResponse& resp, beast::error_code ec) {
reading_ = false;
if (ec) {
handleError(ec.message());
return;
}
completeTask(resp, boost::none);
if (task_queue_.size() > 0) {
asio::post(
executor_,
[this]() {
doPost();
});
}
});
}
}
}
#ifndef __VEREIGN_RESTAPI_CLIENT_HH
#define __VEREIGN_RESTAPI_CLIENT_HH
#include <chrono>
#include <type_traits>
#include <vereign/restapi/detail/post_task.hh>
#include <vereign/restapi/http_header.hh>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/http.hpp>
#include <google/protobuf/message.h>
#include <string>
#include <future>
#include <deque>
namespace vereign {
namespace restapi {
namespace beast = boost::beast;
namespace asio = boost::asio;
using tcp = asio::ip::tcp;
using RequestPtr = std::unique_ptr<google::protobuf::Message>;
namespace detail {
class HttpReader;
} // namespace detail
/**
* Client is a http client for the Vereign Restful API.
*
* Internally the Client uses the [boost::beast][] library.
*
* The client provides both async and blocking APIs for making POST requests.
*
* The blocking methods return futures, and must not be called inside the
* async callbacks.Their purpose is to be used in threads outside of the
* Client's executor.
*
* The POST requests are queued and executed sequentially reusing single
* connection, reconnecting if needed.
*
* The connection is kept alive until there are requests, or when idle until
* specified timeout is expired.
*
* [boost::beast]: \boostlib/libs/beast/doc/html/index.html
*
* ### Thread Safety
*
* All public methods are thread safe.
*/
class Client {
public:
/**
* Constructs Client instance with default timeout (30s).
*
* @param ioc IO context.
* @param ssl_ctx SSL context.
* @param host Vereign restapi host.
* @param port Vereign restapi port. For example "https" or "443".
* @returns Client instance.
*/
Client(
asio::io_context& ioc,
asio::ssl::context& ssl_ctx,
const std::string& host,
const std::string& port
);
/**
* Constructs Client instance.
*
* @param ioc IO context.
* @param ssl_ctx SSL context.
* @param host Vereign restapi host.
* @param port Vereign restapi port. For example "https" or "443".
* @param expiry_time Connect, read, write expiration time.
* @returns Client instance.
*/
Client(
asio::io_context& ioc,
asio::ssl::context& ssl_ctx,
const std::string& host,
const std::string& port,
std::chrono::nanoseconds expiry_time
);
/**
* Closes the connection.
*
* @see Client::Close
*/
~Client() noexcept;
// Disable copying
Client(const Client&) = delete;
Client& operator=(const Client&) = delete;
/**
* Close closes the connection.
*
* The connection to the server is closed, and the current pending request
* is cancelled with an error. All other requests in the queue will be also
* cancelled. Subsequent calls to Client::Post or Client::PostAsync methods
* will return with error, i.e. the client won't execute any requests anymore.
*/
void Close();
/**
* GetExecutor returns the client's strand executor.
*
* @returns the client's strand executor.
*/
const asio::executor& GetExecutor() const;
/**
* Post makes a blocking post request.
*
* The passed `req` and `resp` parameters are moved in and returned back once,
* the returned future is resolved.
*
* @param path HTTP path, for example `/api/identity/loginWithPreviouslyAddedDevice`.
* @param req Request object that will be serialized as JSON body.
* It can be a const pointer or std::unique_ptr to protobuf message.
*
* @param resp Response object that will be used to decode the response.
* It can be a pointer or std::unique_ptr to protobuf message.
*
* @returns future that will be resolved with a result containing both
* the request and response objects originally passed to the `Post` call.
*/
template <class RequestPtrType, class ResponsePtrType>
auto Post(
const std::string& path,
RequestPtrType&& req,
ResponsePtrType&& resp
) {
using RequestPtr = typename std::remove_reference<RequestPtrType>::type;
using ResponsePtr = typename std::remove_reference<ResponsePtrType>::type;
using Result = PostResult<RequestPtr, ResponsePtr>;
std::promise<Result> promise{};
auto future = promise.get_future();
PostAsync(
path,
std::move(req),
std::move(resp),
[promise = std::move(promise)] (Result&& result) mutable {
promise.set_value(std::move(result));
}
);
return future;
}
/**
* Post makes a blocking post request with additional http headers.
*
* The passed `req` and `resp` parameters are moved in and returned back once,
* the returned future is resolved.
*
* @param path HTTP path, for example `/api/identity/loginWithPreviouslyAddedDevice`.
* @param req Request object that will be serialized as JSON body.
* It can be a const pointer or std::unique_ptr to protobuf message.
*
* @param resp Response object that will be used to decode the response.
* It can be a pointer or std::unique_ptr to protobuf message.
*
* @param headers HTTP headers to include in the request.
*
* @returns future that will be resolved with a result containing both
* the request and response objects originally passed to the `Post` call.
*/
template <class RequestPtrType, class ResponsePtrType>
auto Post(
const std::string& path,
RequestPtrType&& req,
ResponsePtrType&& resp,
std::vector<HttpHeader>&& headers
) {
using RequestPtr = typename std::remove_reference<RequestPtrType>::type;
using ResponsePtr = typename std::remove_reference<ResponsePtrType>::type;
using Result = PostResult<RequestPtr, ResponsePtr>;
std::promise<Result> promise{};
auto future = promise.get_future();
PostAsync(
path,
std::move(req),
std::move(resp),
std::move(headers),
[promise = std::move(promise)] (Result&& result) mutable {
promise.set_value(std::move(result));
}
);
return future;
}
/**
* PostAsync makes non blocking post request.
*
* The passed `req` and `resp` parameters are moved in and returned back once
* the CompletionFunc is called.
*
* @tparam CompletionFunc A completion functor - `void (Result&&)`, where
* the `Result` is restapi::PostResult <RequestPtr, ResponsePtr>, where
* RequestPtr is `std::remove_reference<RequestPtrType>::type` and
* ResponsePtr is `std::remove_reference<RequestPtrType>::type`.
*
* @param path HTTP path, for example `/api/identity/loginWithPreviouslyAddedDevice`.
* @param req Request object that will be serialized as JSON body.
* It can be a const pointer or std::unique_ptr to protobuf message.
*
* @param resp Response object that will be used to decode the response.
* It can be a pointer or std::unique_ptr to protobuf message.
*
* @param func Completion func, that will be called when the post request
* is finished.
*/
template <class RequestPtrType, class ResponsePtrType, class CompletionFunc>
void PostAsync(
const std::string& path,
RequestPtrType&& req,
ResponsePtrType&& resp,
CompletionFunc&& func
) {
using RequestPtr = typename std::remove_reference<RequestPtrType>::type;
using ResponsePtr = typename std::remove_reference<ResponsePtrType>::type;
addPostTask(
std::make_unique<detail::PostTask<RequestPtr, ResponsePtr, CompletionFunc>>(
path,
std::move(req),
std::move(resp),
std::move(func)
)
);
}
/**
* PostAsync makes non blocking post request with additional http headers.
*
* The passed `req` and `resp` parameters are moved in and returned back once
* the CompletionFunc is called.
*
* @tparam CompletionFunc A completion functor - `void (Result&&)`, where
* the `Result` is restapi::PostResult <RequestPtr, ResponsePtr>, where
* RequestPtr is `std::remove_reference<RequestPtrType>::type` and
* ResponsePtr is `std::remove_reference<RequestPtrType>::type`.
*
* @param path HTTP path, for example `/api/identity/loginWithPreviouslyAddedDevice`.
* @param req Request object that will be serialized as JSON body.
* It can be a const pointer or std::unique_ptr to protobuf message.
*
* @param resp Response object that will be used to decode the response.
* It can be a pointer or std::unique_ptr to protobuf message.
*
* @param headers HTTP headers to include in the request.
*
* @param func Completion func, that will be called when the post request
* is finished.
*/
template <class RequestPtrType, class ResponsePtrType, class CompletionFunc>
void PostAsync(
const std::string& path,
RequestPtrType&& req,
ResponsePtrType&& resp,
std::vector<HttpHeader>&& headers,
CompletionFunc&& func
) {
using RequestPtr = typename std::remove_reference<RequestPtrType>::type;
using ResponsePtr = typename std::remove_reference<ResponsePtrType>::type;
addPostTask(
std::make_unique<detail::PostTask<RequestPtr, ResponsePtr, CompletionFunc>>(
path,
std::move(req),
std::move(resp),
std::move(headers),
std::move(func)
)
);
}
private:
using streamType = beast::ssl_stream<beast::tcp_stream>;
void handleError(const boost::optional<std::string>& err);
void addPostTask(detail::PostTaskBasePtr&& task);
void completeTask(
const boost::optional<detail::HttpResponse>& resp,
const boost::optional<std::string>& err
);
void doPost();
void resolve();
void connect(tcp::resolver::results_type results);
void handshake();
void readResponse();
void resetStream();
void cancelAllTasks(const detail::PostError& err);
private:
// a strand that calls all completion handlers in single thread.
asio::executor executor_;
// ssl context used for the https connection.
asio::ssl::context& ssl_ctx_;
// all post requests are added as tasks in this queue.
std::deque<detail::PostTaskBasePtr> task_queue_;
// ssl stream used for the http requests.
std::shared_ptr<streamType> stream_;
// used for resolving the host name into ip address.
tcp::resolver resolver_;
// request object used for the async_write calls of the post requests.
boost::optional<detail::HttpRequest> req_;
// reads the requests responses.
std::shared_ptr<detail::HttpReader> reader_;
// http server host name.
std::string host_;
// http server port - https, 443 ...
std::string port_;
// connecting, read, write expiration time.
std::chrono::nanoseconds expiry_time_;
// indicates that the client is currently in connecting state.
bool connecting_;
// indicates that the client is currently writing a post request.
bool writing_;
// indicates that the client is currently waiting for response to be read.
bool reading_;
// indicates that the client is closed.
bool closed_;
};
} // namespace restapi
} // namespace vereign
#endif // __VEREIGN_RESTAPI_CLIENT_HH
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment