diff --git a/cpp/.clang-format b/cpp/.clang-format new file mode 100644 index 0000000000000000000000000000000000000000..4314079c156529a96bc58ba0c63a1f2b890a1ddd --- /dev/null +++ b/cpp/.clang-format @@ -0,0 +1,33 @@ +--- +Language: Cpp +AlignAfterOpenBracket: AlwaysBreak +AlignConsecutiveAssignments: false +AlignConsecutiveDeclarations: false +AlignConsecutiveMacros: true +AlignOperands: false +AlignTrailingComments: true +AllowAllArgumentsOnNextLine: false +AllowAllConstructorInitializersOnNextLine: false +AllowAllParametersOfDeclarationOnNextLine: false +AllowShortBlocksOnASingleLine: false +AllowShortCaseLabelsOnASingleLine: false +AllowShortFunctionsOnASingleLine: Empty +AllowShortIfStatementsOnASingleLine: Never +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: true +AlwaysBreakTemplateDeclarations: Yes +BinPackArguments: false +BinPackParameters: false +ColumnLimit: 80 +ConstructorInitializerAllOnOneLineOrOnePerLine: false +ContinuationIndentWidth: 2 +Cpp11BracedListStyle: true +FixNamespaceComments: true +IndentWidth: 2 +ObjCBlockIndentWidth: 2 +PointerAlignment: Left +SpacesInSquareBrackets: false +Standard: Cpp11 +TabWidth: 2 +UseTab: Never \ No newline at end of file diff --git a/cpp/proto b/cpp/proto index c581306f8816e5e8b708c9097e402354bb41b84a..cfea384107dec09a91ae566481432bfd1b6702dc 160000 --- a/cpp/proto +++ b/cpp/proto @@ -1 +1 @@ -Subproject commit c581306f8816e5e8b708c9097e402354bb41b84a +Subproject commit cfea384107dec09a91ae566481432bfd1b6702dc diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 1e0e2b78089ee9986c27cdbed8308de8d7a7ffaa..205e69aee397da56add3ed4d5514932d4461c041 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -18,6 +18,8 @@ include_directories( ) file(GLOB PROTO_SRC ${CMAKE_SOURCE_DIR}/proto/cpp/vereign/client_library/*.cc) +file(GLOB PROTO_INTERNAL_SRC ${CMAKE_SOURCE_DIR}/proto/cpp/vereign/client_library/internal/*.cc) +list(APPEND PROTO_SRC ${PROTO_INTERNAL_SRC}) list(APPEND PROTO_SRC ${CMAKE_SOURCE_DIR}/proto/cpp/google/api/annotations.pb.cc ${CMAKE_SOURCE_DIR}/proto/cpp/google/api/http.pb.cc @@ -85,12 +87,12 @@ set(VEREIGNLIB_SRC vereign/kvstore/crypto_storage.cc vereign/kvstore/detail/value_encoder.cc + vereign/event/broker.cc vereign/identity/provider.cc vereign/service/identity_service.cc ) - if (LINUX) list(APPEND VEREIGNLIB_SRC vereign/kvstore/detail/linux_crypto_storage.cc @@ -148,9 +150,10 @@ set(csandbox_sources add_executable(csandbox ${csandbox_sources}) target_link_libraries(csandbox PRIVATE + vereignlib # OpenSSL::Crypto # OpenSSL::SSL - vereignlib + profiler # $<$<CXX_COMPILER_ID:MSVC>:Boost::date_time> # Boost::filesystem # Boost::file diff --git a/cpp/src/csandbox.cc b/cpp/src/csandbox.cc index 5b7057af6b90f36d495c2110344a6dfa3abd31d6..5cbbd22b6df93c5e2eab5ddd8e5d89a49e78efc4 100644 --- a/cpp/src/csandbox.cc +++ b/cpp/src/csandbox.cc @@ -1,22 +1,57 @@ -#include <vereign/crypto/cert.hh> +#include "vereign/client_library/event_types.pb.h" +#include "vereign/client_library/internal/event_types.pb.h" +#include "vereign/sync/channel.hh" +#include "vereign/container/bounded_queue.hh" +#include <thread> +// #include "vereign/event/message_pool.hh" +// #include <future> +// #include <thread> +// #include <vereign/crypto/cert.hh> -#include <vereign/crypto/rsa.hh> -#include <vereign/crypto/rand.hh> -#include <vereign/crypto/bio.hh> -#include <vereign/bytes/view_dump.hh> -#include <vereign/fs/util.hh> -#include <vereign/core/time.hh> -#include <boost/date_time.hpp> +// #include <vereign/crypto/rsa.hh> +// #include <vereign/crypto/rand.hh> +// #include <vereign/crypto/bio.hh> +// #include <vereign/bytes/view_dump.hh> +// #include <vereign/fs/util.hh> +// #include <vereign/core/time.hh> +// #include <boost/date_time.hpp> +// #include <boost/pool/object_pool.hpp> -#include <openssl/x509v3.h> +// #include <openssl/x509v3.h> -#include <chrono> -#include <iostream> -#include <fstream> +// #include <chrono> +// #include <iostream> +// #include <fstream> auto main(int argc, char** argv) -> int { argc = 0; argv = nullptr; - return 0; + int64_t iterations = int64_t(1000)*1000*1000*10; + + using clock = std::chrono::high_resolution_clock; + auto q = vereign::container::BoundedQueue<int>(10); + + auto start = clock::now(); + int64_t s = 0; + + for (int64_t i = 0; i < iterations; i++) { + if (q.IsFull()) { + q.PopFront(); + } + + q.PushBack(i); + } + + auto end = clock::now() - start; + + std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(end).count() << std::endl; + std::cout << std::chrono::duration_cast<std::chrono::nanoseconds>(end).count()/iterations << std::endl; + + while (!q.IsEmpty()) { + s += q.Front(); + q.PopFront(); + } + + std::cout << s << std::endl; } diff --git a/cpp/src/vereign/container/bounded_queue.hh b/cpp/src/vereign/container/bounded_queue.hh new file mode 100644 index 0000000000000000000000000000000000000000..fbb2ab64b0110e3dd9d5ca0219c9902ab9ee06df --- /dev/null +++ b/cpp/src/vereign/container/bounded_queue.hh @@ -0,0 +1,139 @@ +#ifndef __VEREIGN_CONTAINER_BOUNDED_QUEUE_HH +#define __VEREIGN_CONTAINER_BOUNDED_QUEUE_HH + +#include <vector> + +namespace vereign::container { + +/** + * A queue (FIFO) data structure with fixed size. + * + * Being a fixed in its size, the BoundedQueue has the opportunity to internally allocate only once + * upon construction a single continues block of memory for holding the queue elements. + */ +template <typename T> +class BoundedQueue { +public: + /** + * The type of the values in the queue. + */ + using ValueType = T; + + // Helper type that is properly aligned so that it can act as a storage for the ValueType. + using StorageType = typename std::aligned_storage_t<sizeof(T), alignof(T)>; + + /** + * Creates a queue with given capacity. + * + * The capacity cannot be changed after construction. + */ + BoundedQueue(std::size_t capacity) + : begin_{0}, + size_{0}, + cap_{capacity}, + queue_{new StorageType[capacity]} + { + } + + /** + * Destroy the elements if any, and free all the memory. + */ + ~BoundedQueue() { + while (!IsEmpty()) { + PopFront(); + } + + delete [] queue_; + } + + // disable copying + BoundedQueue(const BoundedQueue&) = delete; + auto operator=(const BoundedQueue&) -> BoundedQueue& = delete; + + /** + * Retrieve the number of elements in the queue. + * + * @returns the number of elements in the queue. + */ + auto Size() -> std::size_t { + return size_; + } + + /** + * Check if the queue is empty. + * + * @returns true when the queue is empty, false otherwise. + */ + auto IsEmpty() -> bool { + return size_ == 0; + } + + /** + * Check if the queue is full up to its capacity. + * + * @returns true when the queue is full, false otherwise. + */ + auto IsFull() -> bool { + return size_ == cap_; + } + + /** + * Moves a value to the end of the queue. + * + * @note Calling BoundedQueue::PushBack on full queue, is undefined behaviour. + */ + void PushBack(ValueType&& value) { + auto pos = (begin_ + size_) % cap_; + + new(&queue_[pos]) T(std::move(value)); + + size_++; + } + + /** + * Copies a value to the end of the queue. + * + * @note Calling BoundedQueue::PushBack on full queue, is undefined behaviour. + */ + void PushBack(const ValueType& value) { + auto pos = (begin_ + size_) % cap_; + + new(&queue_[pos]) T(value); + + size_++; + } + + /** + * Access the first value in the queue. + * + * @note Calling BoundedQueue::Front on empty queue is undefined behaviour. + */ + auto Front() -> ValueType& { + return reinterpret_cast<ValueType&>(queue_[begin_]); + } + + /** + * Remove the fist item from the queue. + * + * @note Calling BoundedQueue::PopFront on empty queue is undefined behaviour. + */ + void PopFront() { + reinterpret_cast<ValueType*>(&queue_[begin_])->~ValueType(); + + size_--; + begin_ = (begin_ == cap_ - 1) ? 0 : begin_ + 1; + } + +private: + std::size_t begin_; + + std::size_t size_; + + std::size_t cap_; + + StorageType* queue_; +}; + +} // namespace vereign::container + +#endif // __VEREIGN_CONTAINER_BOUNDED_QUEUE_HH diff --git a/cpp/src/vereign/event/broker.cc b/cpp/src/vereign/event/broker.cc new file mode 100644 index 0000000000000000000000000000000000000000..f86e3e3932ec44b7d2cd0e943e5d1f2bcc499bab --- /dev/null +++ b/cpp/src/vereign/event/broker.cc @@ -0,0 +1,266 @@ +#include <vereign/event/broker.hh> + +#include <vereign/restapi/client_session.hh> +#include <vereign/identity/provider.hh> + +namespace { + +const auto pollInterval = std::chrono::seconds(1); + +const auto eventsModeEntity = std::string{"entity"}; +const auto eventsModeDeviceKey = std::string{"devicekey"}; + +const auto getNewEventsWithoutSessionPath = std::string{"/event/getNewEventsWithoutSession"}; +const auto getNewEventsPath = std::string{"/event/getNewEvents"}; + +const auto updateLastViewedWithoutSessionPath = std::string{"/event/updateLastViewedWithoutSession"}; +const auto updateLastViewedPath = std::string{"/event/updateLastViewed"}; + +auto eventBatchTime(vereign::client_library::GetNewEventsFormResponse& resp) -> int64_t { + int64_t max_time = 0; + for (auto& event : resp.data()) { + max_time = std::max(max_time, event.stamp()); + } + + return max_time; +} + +} + +namespace vereign::event { + +Broker::Broker(restapi::ClientSession& client_session) + : client_session_{client_session} +{ + entity_req_.set_mode(eventsModeEntity); + device_req_.set_mode(eventsModeDeviceKey); + + update_entity_req_.set_mode(eventsModeEntity); + update_device_req_.set_mode(eventsModeDeviceKey); + + dispatch_thread_ = std::thread([this] { + pollAndDispatch(); + }); +} + +auto Broker::Subscribe() -> Subscription { + auto Subscribe() -> Subscription; + auto channel = std::make_shared<sync::Channel<EventTypeSharedPtr>>(10); + + uint64_t id = 0; + + { + std::lock_guard<std::mutex> lock{mu_}; + + id = ++next_subscription_id_; + subscriptions_[id] = Subscription{}; + subscriptions_[id].ID = id; + subscriptions_[id].EventsChannel = channel; + } + + cv_.notify_one(); + + return subscriptions_[id]; +} + +void Broker::Unsubscribe(const Subscription& subscription) { + std::unique_lock<std::mutex> lock{mu_}; + + auto it = subscriptions_.find(subscription.ID); + if (it == subscriptions_.end()) { + return; + } + + it->second.EventsChannel->Close(); + + subscriptions_.erase(it); +} + +auto Broker::dispatchEvents(EventTypeSharedPtr events) -> int { + std::unique_lock<std::mutex> lock{mu_}; + cv_.wait(lock, [this] { + return subscriptions_.size() > 0; + }); + + int dispatched_cnt = 0; + + for (auto& s : subscriptions_) { + auto channel = s.second.EventsChannel; + + auto r = channel->TryAdd(events); + if (r.IsOk()) { + dispatched_cnt++; + } + } + + return dispatched_cnt; +} + +// NOTE: it is assumed that the mutex is already locked. +void Broker::closeAllSubscriptions() { + for (auto& s: subscriptions_) { + s.second.EventsChannel->Close(); + } +} + +void Broker::pollEventsWithoutSession() { + if (!client_session_.HasIdentity()) { + return; + } + + auto resp = std::make_shared<client_library::GetNewEventsFormResponse>(); + + auto result = client_session_.PublicPost( + getNewEventsWithoutSessionPath, + &device_req_, + resp.get() + ); + + result.wait(); + + // do not dispatch successful poll, when there are no new events + if (resp->code() == "200" && resp->data().size() == 0) { + return; + } + + auto cnt = dispatchEvents(resp); + // if nothing is dispatched, do not update the last viewed events + if (cnt == 0) { + return; + } + + auto time = eventBatchTime(*resp); + if (time == 0) { + return; + } + + update_device_req_.set_lastviewed(std::to_string(time)); + auto update_result = client_session_.PublicPost( + updateLastViewedWithoutSessionPath, + &update_device_req_, + &update_device_resp_ + ); + + update_result.wait(); + + // TODO: Add debug log here on failure. +} + +void Broker::pollEntityEvents() { + auto resp = std::make_shared<client_library::GetNewEventsFormResponse>(); + + auto result = client_session_.Post(getNewEventsPath, &entity_req_, resp.get()); + + result.wait(); + + // do not dispatch successful poll, when there are no new events + if (resp->code() == "200" && resp->data().size() == 0) { + return; + } + + auto cnt = dispatchEvents(resp); + // if nothing is dispatched, do not update the last viewed events + if (cnt == 0) { + return; + } + + auto time = eventBatchTime(*resp); + if (time == 0) { + return; + } + + update_entity_req_.set_lastviewed(std::to_string(time)); + auto update_result = client_session_.Post( + updateLastViewedPath, + &update_entity_req_, + &update_entity_resp_ + ); + + update_result.wait(); + + // TODO: Add debug log here on failure. +} + +void Broker::pollDeviceEvents() { + auto resp = std::make_shared<client_library::GetNewEventsFormResponse>(); + + auto result = client_session_.Post(getNewEventsPath, &device_req_, resp.get()); + + result.wait(); + + // do not dispatch successful poll, when there are no new events + if (resp->code() == "200" && resp->data().size() == 0) { + return; + } + + auto cnt = dispatchEvents(resp); + // if nothing is dispatched, do not update the last viewed events + if (cnt == 0) { + return; + } + + auto time = eventBatchTime(*resp); + if (time == 0) { + return; + } + + update_device_req_.set_lastviewed(std::to_string(time)); + auto update_result = client_session_.Post( + updateLastViewedPath, + &update_device_req_, + &update_device_resp_ + ); + + update_result.wait(); + + // TODO: Add debug log here on failure. +} + +void Broker::pollAndDispatch() { + for (;;) { + { + std::unique_lock<std::mutex> lock{mu_}; + cv_.wait(lock, [this] { + return subscriptions_.size() > 0 || stopped_ == true; + }); + + if (stopped_) { + closeAllSubscriptions(); + break; + } + } + + if (!client_session_.HasSession()) { + pollEventsWithoutSession(); + } else { + pollDeviceEvents(); + pollEntityEvents(); + } + + std::this_thread::sleep_for(pollInterval); + } +} + +void Broker::Shutdown() { + { + std::lock_guard<std::mutex> lock{mu_}; + + if (stopped_) { + return; + } + + stopped_ = true; + } + + cv_.notify_one(); + + if (dispatch_thread_.joinable()) { + dispatch_thread_.join(); + } +} + +Broker::~Broker() { + Shutdown(); +} + +} // namespace vereign::event diff --git a/cpp/src/vereign/event/broker.hh b/cpp/src/vereign/event/broker.hh new file mode 100644 index 0000000000000000000000000000000000000000..f8842dc209a989cb1d14f118dbc5ac63aa22a5e6 --- /dev/null +++ b/cpp/src/vereign/event/broker.hh @@ -0,0 +1,131 @@ +#ifndef __VEREIGN_EVENT_BROKER_HH +#define __VEREIGN_EVENT_BROKER_HH + +#include <vereign/sync/channel.hh> +#include <vereign/client_library/event_types.pb.h> +#include <vereign/client_library/internal/event_types.pb.h> +#include <vereign/client_library/common_types.pb.h> + +#include <thread> +#include <unordered_map> + +// forward declarations +namespace vereign::restapi { + +class ClientSession; + +} // namespace vereign + +namespace vereign::event { + +using EventType = client_library::GetNewEventsFormResponse; +using EventTypeSharedPtr = std::shared_ptr<EventType>; +using EventsChannelSharedPtr = std::shared_ptr<sync::Channel<EventTypeSharedPtr>>; + +/** + * Subscription that is returned, when calling Broker::Subscribe. + * + * Once the consumer is done consuming, it must call Broker::Unsubscribe, passing the + * Subscription object. + */ +struct Subscription { + /** + * Subscription id. + */ + uint64_t ID; + + /** + * Channel that the subscribers can use to receive dispatched events. + */ + EventsChannelSharedPtr EventsChannel; +}; + +/** + * Broker polls restful API for events and dispatches them to a set of subscribers. + * + * All public methods are thread safe. + */ +class Broker { +public: + /** + * Creates a broker. + * + * @param client_session The client session used for polling the restful API for new events. + */ + Broker(restapi::ClientSession& client_session); + + /** + * Shutdown the broker. + */ + ~Broker(); + + // disable copying + Broker(const Broker&) = delete; + auto operator=(const Broker&) -> Broker = delete; + + /** + * Subscribe for events. + * + * Once the consumer is done with the returned Subscription, it must call + * Broker::Unsubscribe, passing the Subscription object. + * + * @returns a subscription. + */ + auto Subscribe() -> Subscription; + + /** + * Remove a subscription. + * + * The broker stops to dispatch events to the `subscription` channel. + */ + void Unsubscribe(const Subscription& subscription); + + /** + * Shutdown the broker. + * + * All subscription channels will be closed. + * + * Shutdown is idempotent, meaning that it can be safely called more than once. + */ + void Shutdown(); + +private: + void pollEventsWithoutSession(); + void pollDeviceEvents(); + void pollEntityEvents(); + void pollAndDispatch(); + auto dispatchEvents(EventTypeSharedPtr events) -> int; + void closeAllSubscriptions(); + +private: + restapi::ClientSession& client_session_; + + std::thread dispatch_thread_; + + client_library::internal::GetNewEventsForm entity_req_; + + client_library::internal::GetNewEventsForm device_req_; + + client_library::internal::UpdateLastViewedForm update_entity_req_; + + client_library::internal::UpdateLastViewedForm update_device_req_; + + client_library::EmptyResponse update_entity_resp_; + + client_library::EmptyResponse update_device_resp_; + + // protects the fields that follow + std::mutex mu_; + + std::condition_variable cv_; + + bool stopped_ = false; + + int64_t next_subscription_id_ = 0; + + std::unordered_map<int64_t, Subscription> subscriptions_; +}; + +} // namespace vereign::event + +#endif // __VEREIGN_EVENT_BROKER_HH diff --git a/cpp/src/vereign/grpc/event_api.hh b/cpp/src/vereign/grpc/event_api.hh new file mode 100644 index 0000000000000000000000000000000000000000..42afa02396fec2ce80a280924e9d1d897f978be6 --- /dev/null +++ b/cpp/src/vereign/grpc/event_api.hh @@ -0,0 +1,86 @@ +#ifndef __VEREIGN_GRPC_EVENT_API_HH +#define __VEREIGN_GRPC_EVENT_API_HH + +#include <vereign/grpc/gen/event_api.hh> + +#include <vereign/event/broker.hh> +#include <vereign/core/scope_guard.hh> + +// #include <vereign/grpc/error_code.hh> +// #include <vereign/kvstore/errors.hh> +// #include <vereign/client_library/common_types.pb.h> +// #include <vereign/client_library/identity_types.pb.h> +#include <boost/core/ignore_unused.hpp> + +#include <thread> + +namespace vereign::grpc { + +/** + * Implementation of the gRPC `vereign::client_library::EventAPI::Service` service. + * + * Inherits all the API implementations from the generated gen::EventAPI and adds some + * additional implementations. + */ +template <class VereignService> +class EventAPI final : public gen::EventAPI<VereignService> { +public: + // API service name. + static constexpr const char* Name = gen::EventAPI<VereignService>::Name; + + using VereignServiceType = VereignService; + using VereignServicePtr = std::unique_ptr<VereignService>; + + /** + * Constructs EventAPI instance. + * + * @param service The client library Event service. + */ + EventAPI(VereignServicePtr&& service, event::Broker& events_broker) + : gen::EventAPI<VereignService>{std::move(service)}, + events_broker_{events_broker} + {} + + // disable copying + EventAPI(const EventAPI&) = delete; + auto operator=(const EventAPI&) -> EventAPI& = delete; + + /** + * Returns to the gRPC client a gRPC stream for receiving last available events. + */ + auto GetNewEvents( + ::grpc::ServerContext* ctx, + const client_library::GetNewEventsForm* req, + ::grpc::ServerWriter<client_library::GetNewEventsFormResponse>* resp_stream + ) -> ::grpc::Status override { + boost::ignore_unused(ctx); + boost::ignore_unused(req); + + auto subscription = events_broker_.Subscribe(); + auto unsubscribe = core::MakeScopeGuard([this, &subscription] { + events_broker_.Unsubscribe(subscription); + }); + auto channel = subscription.EventsChannel; + + for (;;) { + auto msg = channel->Get(); + if (!msg) { + break; + } + + auto ok = resp_stream->Write(*msg.Value()); + if (!ok) { + break; + } + } + + return ::grpc::Status::OK; + } + +private: + event::Broker& events_broker_; +}; + +} // namespace vereign::grpc + +#endif // __VEREIGN_GRPC_IDENTITY_API_HH diff --git a/cpp/src/vereign/grpc/server.cc b/cpp/src/vereign/grpc/server.cc index a20883987ce7586e61bacb63eda286956bbc8e3a..295756a1762790ca4926bf6bc079b18db6d69994 100644 --- a/cpp/src/vereign/grpc/server.cc +++ b/cpp/src/vereign/grpc/server.cc @@ -17,6 +17,7 @@ // manually written api #include <vereign/service/identity_service.hh> #include <vereign/grpc/identity_api.hh> +#include <vereign/grpc/event_api.hh> #include <grpcpp/server.h> #include <boost/asio/io_context.hpp> @@ -28,6 +29,12 @@ #include <boost/asio/executor_work_guard.hpp> +namespace { + +constexpr auto shutdownTimeout = std::chrono::seconds(2); + +} + namespace vereign::grpc { namespace asio = boost::asio; @@ -46,6 +53,7 @@ public: ioc_, ssl_context_, vereign_host, vereign_port )}, client_session_{std::make_unique<restapi::ClientSession>(*client_)}, + event_broker_{std::make_unique<event::Broker>(*client_session_)}, kvstorage_{nullptr}, crypto_storage_{nullptr}, identity_provider_{nullptr}, @@ -76,6 +84,12 @@ public: *client_session_, *identity_provider_ ); + services_registry_.RegisterIfNotExist( + std::make_unique<EventAPI<service::gen::EventService>>( + std::make_unique<service::gen::EventService>(*client_session_), + *event_broker_ + ) + ); // register all generated services grpc::gen::RegisterAll(*client_session_, services_registry_); @@ -97,9 +111,17 @@ public: } void Shutdown() { + if (stopped_.test_and_set()) { + return; + } + client_session_->Close(); - server_->Shutdown(); + event_broker_->Shutdown(); + + auto deadline = std::chrono::high_resolution_clock::now() + shutdownTimeout; + + server_->Shutdown(deadline); if (server_thread_.joinable()) { server_thread_.join(); } @@ -121,6 +143,7 @@ private: asio::ssl::context ssl_context_; std::unique_ptr<vereign::restapi::Client> client_; std::unique_ptr<vereign::restapi::ClientSession> client_session_; + std::unique_ptr<vereign::event::Broker> event_broker_; ServiceRegistry services_registry_; std::unique_ptr<kvstore::Storage> kvstorage_; std::unique_ptr<kvstore::CryptoStorage> crypto_storage_; @@ -128,6 +151,7 @@ private: std::unique_ptr<::grpc::Server> server_; std::thread server_thread_; std::thread service_thread_; + std::atomic_flag stopped_ = ATOMIC_FLAG_INIT; }; Server::Server( diff --git a/cpp/src/vereign/grpc/service_registry.hh b/cpp/src/vereign/grpc/service_registry.hh index 94c0fd355fc8a4c1e7bfae838a07b43f41a603ac..cc60016be9b268e3ad3572242736a73ce72334e6 100644 --- a/cpp/src/vereign/grpc/service_registry.hh +++ b/cpp/src/vereign/grpc/service_registry.hh @@ -13,6 +13,18 @@ class ServiceRegistry { public: void RegisterIntoBuilder(::grpc::ServerBuilder& builder); + template <class API> + auto RegisterIfNotExist(std::unique_ptr<API>&& api) -> bool { + auto it = services_.find(API::Name); + if (it != services_.end()) { + return false; + } + + services_[API::Name] = std::move(api); + + return true; + } + template <class API> auto RegisterIfNotExist(restapi::ClientSession& client_session) -> bool { auto it = services_.find(API::Name); diff --git a/cpp/src/vereign/identity/provider.cc b/cpp/src/vereign/identity/provider.cc index f50c045aeab7bfbcf63dbf647b13e547d4a0faa5..6e96552f319de93e66201bd4423752ce1868fb05 100644 --- a/cpp/src/vereign/identity/provider.cc +++ b/cpp/src/vereign/identity/provider.cc @@ -69,6 +69,8 @@ auto Provider::RecreateIdentity(const std::string& pin) -> std::string { bytes::Buffer encoded; encoding::base64::Encode(crypto::bio::View(public_key.get()), encoded); + has_identity_ = true; + return std::string{encoded.View().String()}; } @@ -83,6 +85,8 @@ auto Provider::LoadIdentity(const std::string& pin) -> std::string { bytes::Buffer encoded; encoding::base64::Encode(public_key.View(), encoded); + has_identity_ = true; + return std::string(encoded.View().String()); } diff --git a/cpp/src/vereign/identity/provider.hh b/cpp/src/vereign/identity/provider.hh index b49471733a536a36de849377c6caaf41716ca597..c20763d835204009ea56b2c541147f5b8e91a4c3 100644 --- a/cpp/src/vereign/identity/provider.hh +++ b/cpp/src/vereign/identity/provider.hh @@ -141,6 +141,8 @@ private: std::mutex mu_; kvstore::CryptoStorage& storage_; + + bool has_identity_ = false; }; } // namespace vereign::identity diff --git a/cpp/src/vereign/restapi/client.cc b/cpp/src/vereign/restapi/client.cc index e42834bb8dc48363d3b2a67bb38ca520b8c1ef2d..38f023c008b9a9c357fc430efa95fdc04083cebf 100644 --- a/cpp/src/vereign/restapi/client.cc +++ b/cpp/src/vereign/restapi/client.cc @@ -12,24 +12,23 @@ namespace { constexpr std::string_view httpUserAgent = "Vereign Client Library"; } -namespace vereign { -namespace restapi { +namespace vereign::restapi { constexpr auto defaultTimeout = std::chrono::seconds(30); Client::Client( boost::asio::io_context& ioc, boost::asio::ssl::context& ssl_ctx, - const std::string& host, - const std::string& port + std::string host, + std::string port ) : user_agent_{httpUserAgent}, executor_{asio::make_strand(ioc)}, ssl_ctx_{ssl_ctx}, resolver_{ioc}, reader_{nullptr}, - host_{host}, - port_{port}, + host_{std::move(host)}, + port_{std::move(port)}, expiry_time_{defaultTimeout}, connecting_{false}, writing_{false}, @@ -41,8 +40,8 @@ Client::Client( Client::Client( boost::asio::io_context& ioc, boost::asio::ssl::context& ssl_ctx, - const std::string& host, - const std::string& port, + std::string host, + std::string port, std::chrono::nanoseconds expiry_time ) : user_agent_{httpUserAgent}, @@ -50,8 +49,8 @@ Client::Client( ssl_ctx_{ssl_ctx}, resolver_{ioc}, reader_{nullptr}, - host_{host}, - port_{port}, + host_{std::move(host)}, + port_{std::move(port)}, expiry_time_{expiry_time}, connecting_{false}, writing_{false}, @@ -85,7 +84,7 @@ void Client::Close() { ); } -const asio::executor& Client::GetExecutor() const { +auto Client::GetExecutor() const -> const asio::executor& { return executor_; } @@ -161,7 +160,9 @@ void Client::connect(tcp::resolver::results_type results) { auto stream = stream_; beast::get_lowest_layer(*stream_).async_connect( results, - [this, stream](beast::error_code ec, endpoint_t) { + [this, stream](beast::error_code ec, endpoint_t endpoint) { + boost::ignore_unused(endpoint); + if (ec) { connecting_ = false; handleError(ec.message()); @@ -286,5 +287,4 @@ void Client::readResponse() { }); } -} -} +} // namespace vereign::restapi diff --git a/cpp/src/vereign/restapi/client.hh b/cpp/src/vereign/restapi/client.hh index 6fddf6037d58c5ce4f8c562ba7b00b273510ec75..c03304c3eb97b650b1635cf2cbbbe49e496baa2f 100644 --- a/cpp/src/vereign/restapi/client.hh +++ b/cpp/src/vereign/restapi/client.hh @@ -17,8 +17,7 @@ #include <chrono> #include <type_traits> -namespace vereign { -namespace restapi { +namespace vereign::restapi { namespace beast = boost::beast; namespace asio = boost::asio; @@ -69,8 +68,8 @@ public: Client( asio::io_context& ioc, asio::ssl::context& ssl_ctx, - const std::string& host, - const std::string& port + std::string host, + std::string port ); /** @@ -86,8 +85,8 @@ public: Client( asio::io_context& ioc, asio::ssl::context& ssl_ctx, - const std::string& host, - const std::string& port, + std::string host, + std::string port, std::chrono::nanoseconds expiry_time ); @@ -100,7 +99,7 @@ public: // Disable copying Client(const Client&) = delete; - Client& operator=(const Client&) = delete; + auto operator=(const Client&) -> Client& = delete; /** * Retrieve client http user agent string. @@ -126,7 +125,7 @@ public: * * @returns the client's strand executor. */ - const asio::executor& GetExecutor() const; + auto GetExecutor() const -> const asio::executor&; /** * Post makes a blocking post request. @@ -365,7 +364,6 @@ private: bool closed_; }; -} // namespace restapi -} // namespace vereign +} // namespace vereign::restapi #endif // __VEREIGN_RESTAPI_CLIENT_HH diff --git a/cpp/src/vereign/restapi/client_session.hh b/cpp/src/vereign/restapi/client_session.hh index a94fdce3cb020f7ea9c1a11eec61044c4ba7c83d..55ee67962299230e8a280016a8e5631c69f0a676 100644 --- a/cpp/src/vereign/restapi/client_session.hh +++ b/cpp/src/vereign/restapi/client_session.hh @@ -45,6 +45,8 @@ inline auto NoAuthError() -> AuthError { } struct Session { + std::string PublicKey; + std::string DeviceHash; std::string Token; std::string UUID; }; @@ -69,7 +71,10 @@ public: ClientSession(Client& client) : client_{client}, base_path_{"/api"}, - pub_key_{""} + uuid_{""}, + session_token_{""}, + pub_key_{""}, + device_hash_{""} {} /** @@ -79,9 +84,13 @@ public: * @param base_path API base path, for example `/api` * @returns ClientSession instance. */ - ClientSession(Client& client, std::string base_path) : client_{client}, + ClientSession(Client& client, std::string base_path) + : client_{client}, base_path_{std::move(base_path)}, - pub_key_{""} + uuid_{""}, + session_token_{""}, + pub_key_{""}, + device_hash_{""} {} // Disable copying. @@ -102,21 +111,24 @@ public: * * @param key The public key that will be used for authentication. */ - void SetPubKey(std::string key) { + void SetPubKey(std::string key, std::string hash) { std::lock_guard<std::mutex> l{mu_}; pub_key_ = std::move(key); + device_hash_ = std::move(hash); session_token_ = ""; uuid_ = ""; } /** - * Retrieve the current public key used for authentication. + * Checks if the client has identity. + * + * @returns true when there device identity exits, false otherwise. */ - auto GetPubKey() -> std::string { + auto HasIdentity() -> bool { std::lock_guard<std::mutex> l{mu_}; - return pub_key_; + return pub_key_.size() != 0; } /** @@ -139,7 +151,7 @@ public: } /** - * Post makes a blocking post request. + * Post makes an authenticated blocking post request. * * The passed `req` and `resp` parameters are moved in and returned back once, * the returned future is resolved. @@ -181,7 +193,7 @@ public: } /** - * PostAsync makes non blocking post request. + * PostAsync makes an authenticated non blocking post request. * * The passed `req` and `resp` parameters are moved in and returned back once * the CompletionFunc is called. @@ -225,6 +237,93 @@ public: ); } + /** + * PublicPostAsync makes a non authenticated non blocking post request. + * + * The request is without session, only the device identity - public key, device hash are used + * for this request. + * It is suitable for the public API. + * + * The passed `req` and `resp` parameters are moved in and returned back once + * the CompletionFunc is called. + * + * @tparam CompletionFunc A completion functor - `void (Result&&)`, where + * the `Result` is restapi::PostResult <RequestPtr, ResponsePtr>, where + * RequestPtr is `std::remove_reference<RequestPtrType>::type` and + * ResponsePtr is `std::remove_reference<RequestPtrType>::type`. + * + * @param path HTTP path, for example `/passport/listPassports`. + * @param req Request object that will be serialized as JSON body. + * It can be a const pointer or std::unique_ptr to protobuf message. + * + * @param resp Response object that will be used to decode the response. + * It can be a pointer or std::unique_ptr to protobuf message. + * + * @param func Completion func, that will be called when the post request + * is finished. + */ + template <class RequestPtrType, class ResponsePtrType, class CompletionFunc> + void PublicPostAsync( + const std::string& path, + RequestPtrType&& req, + ResponsePtrType&& resp, + CompletionFunc&& cf + ) { + + auto session = getSession(); + + client_.PostAsync( + base_path_ + path, + std::move(req), + std::move(resp), + std::vector<vereign::restapi::HttpHeader>{ + {"publicKey", session.PublicKey}, + {"deviceHash", session.DeviceHash} + }, + std::move(cf) + ); + } + + /** + * PublicPost makes a non authenticated blocking post request. + * + * The request is without session, only the device identity - public key, device hash are used + * for this request. + * It is suitable for the public API. + * + * The passed `req` and `resp` parameters are moved in and returned back once, + * the returned future is resolved. + * + * @param path HTTP path, for example `/passport/listPassports`. + * The path will be added to the ClientSession base path. + * @param req Request object that will be serialized as JSON body. + * It can be a const pointer or std::unique_ptr to protobuf message. + * + * @param resp Response object that will be used to decode the response. + * It can be a pointer or std::unique_ptr to protobuf message. + * + * @returns future that will be resolved with a result containing both + * the request and response objects originally passed to the `Post` call. + */ + template <class RequestPtrType, class ResponsePtrType> + auto PublicPost( + const std::string& path, + RequestPtrType&& req, + ResponsePtrType&& resp + ) { + auto session = getSession(); + + return client_.Post( + base_path_ + path, + std::move(req), + std::move(resp), + std::vector<vereign::restapi::HttpHeader>{ + {"publicKey", session.PublicKey}, + {"deviceHash", session.DeviceHash} + } + ); + } + /** * Initiate authentication. * @@ -233,13 +332,18 @@ public: * @param pub_key The public key to authenticate with. * @param resp Authentication response. */ - auto Authenticate(const std::string& pub_key, client_library::EmptyResponse* resp) { + auto Authenticate( + const std::string& pub_key, + const std::string& device_hash, + client_library::EmptyResponse* resp + ) { std::promise<void> promise; auto future = promise.get_future(); { std::lock_guard<std::mutex> l{mu_}; pub_key_ = pub_key; + device_hash_ = device_hash; session_token_ = ""; uuid_ = ""; } @@ -260,6 +364,17 @@ public: future.wait(); } + /** + * Checks if the client is authenticated. + * + * @returns true when there is successfully authenticated session, false otherwise. + */ + auto HasSession() -> bool { + std::lock_guard<std::mutex> l{mu_}; + + return session_token_.size() != 0; + } + /** * Retrieve the restful-api base path. * @@ -270,10 +385,12 @@ public: } private: - auto hasSession() -> bool { + + void destroySession() { std::lock_guard<std::mutex> l{mu_}; - return session_token_.size() != 0; + session_token_ = ""; + uuid_ = ""; } void updateSession(const std::string& token, const std::string& uuid) { @@ -287,6 +404,8 @@ private: std::lock_guard<std::mutex> l{mu_}; return detail::Session{ + pub_key_, + device_hash_, session_token_, uuid_ }; @@ -317,7 +436,8 @@ private: std::move(req), std::move(resp), std::vector<vereign::restapi::HttpHeader>{ - {"publicKey", GetPubKey()}, + {"publicKey", session.PublicKey}, + {"deviceHash", session.DeviceHash}, {"token", session.Token}, {"uuid", session.UUID} }, @@ -356,7 +476,8 @@ private: std::move(req), std::move(resp), std::vector<vereign::restapi::HttpHeader>{ - {"publicKey", GetPubKey()}, + {"publicKey", session.PublicKey}, + {"deviceHash", session.DeviceHash}, {"token", session.Token}, {"uuid", session.UUID} }, @@ -372,7 +493,10 @@ private: ) { using namespace std::placeholders; auto& resp = result.Response; + if (resp->code() == "400" && resp->status() == "Bad session") { + destroySession(); + withAuthentication( std::bind( &ClientSession::authPostRetryAsync<RequestPtr, ResponsePtr, CompletionFunc>, @@ -396,7 +520,8 @@ private: asio::post( client_.GetExecutor(), [this, cf = std::move(cf)]() mutable { - if (hasSession()) { + auto session = getSession(); + if (!session.Token.empty()) { cf(detail::NoAuthError()); return; } @@ -410,7 +535,8 @@ private: std::make_unique<client_library::EmptyRequest>(), std::make_unique<client_library::LoginFormPreviousAddedDeviceResponse>(), std::vector<vereign::restapi::HttpHeader>{ - {"publicKey", GetPubKey()} + {"publicKey", session.PublicKey}, + {"deviceHash", session.DeviceHash} }, [this, cf = std::move(cf)] (ResultType&& result) mutable { if (result.Response->code() != "200") { @@ -456,9 +582,11 @@ private: // session token std::string session_token_; - // public key used for creating the authenticated sessions std::string pub_key_; + + // public key hash + std::string device_hash_; }; } // namespace vereign diff --git a/cpp/src/vereign/restapi/detail/post_task.hh b/cpp/src/vereign/restapi/detail/post_task.hh index 14d91fcc2411db7bbec59bfd9ef601b7dd8e8e26..88b84794f1329bf69743cf4d69c6bab4472fe104 100644 --- a/cpp/src/vereign/restapi/detail/post_task.hh +++ b/cpp/src/vereign/restapi/detail/post_task.hh @@ -12,8 +12,7 @@ #include <google/protobuf/util/json_util.h> #include <fmt/core.h> -namespace vereign { -namespace restapi { +namespace vereign::restapi { namespace beast = boost::beast; @@ -109,6 +108,8 @@ public: void Complete(const PostError& err) override { if (err) { + resp_->set_code("500"); + resp_->set_status("http client error"); resp_->set_error(err.value()); } @@ -176,7 +177,6 @@ private: }; } // namespace detail -} // namespace restapi } // namespace vereign #endif // __VEREIGN_RESTAPI_DETAIL_POST_TASK_HH diff --git a/cpp/src/vereign/service/identity_service.cc b/cpp/src/vereign/service/identity_service.cc index fe39c2be506f67faa36dff971006e9780f35a641..9a23b616b7b843afaf409989f530c6fd5a68faeb 100644 --- a/cpp/src/vereign/service/identity_service.cc +++ b/cpp/src/vereign/service/identity_service.cc @@ -28,7 +28,7 @@ void IdentityService::LoginWithExistingPubKey( const client_library::LoginWithExistingPubKeyForm* req, client_library::EmptyResponse* resp ) { - client_session_.Authenticate(req->pubkey(), resp); + client_session_.Authenticate(req->pubkey(), "", resp); } void IdentityService::LoginWithNewDevice( @@ -50,7 +50,7 @@ void IdentityService::LoginWithNewDevice( result.wait(); if (resp->code() == "200") { - client_session_.SetPubKey(public_key); + client_session_.SetPubKey(public_key, identity_provider_.GetDeviceHash()); } } @@ -60,7 +60,7 @@ void IdentityService::LoginWithPreviouslyAddedDevice( client_library::EmptyResponse* resp ) { auto public_key = identity_provider_.LoadIdentity(req->pin()); - client_session_.Authenticate(public_key, resp); + client_session_.Authenticate(public_key, identity_provider_.GetDeviceHash(), resp); } } // namespace vereign diff --git a/cpp/src/vereign/sync/channel.hh b/cpp/src/vereign/sync/channel.hh index ff6ff439de10fa058ee1a80460cfc8feb1e69dd2..0e1544b29957820a0bd9218b2a8736618c3b9d27 100644 --- a/cpp/src/vereign/sync/channel.hh +++ b/cpp/src/vereign/sync/channel.hh @@ -1,13 +1,12 @@ #ifndef __VEREIGN_SYNC_CHANNEL_HH #define __VEREIGN_SYNC_CHANNEL_HH -#include <deque> +#include <vereign/container/bounded_queue.hh> + #include <condition_variable> #include <boost/optional.hpp> -#include <iostream> -namespace vereign { -namespace sync { +namespace vereign::sync { /** * ChannelAddResult is used as return value by Channel TryAdd methods. @@ -20,7 +19,8 @@ public: /** * Default constructor - the result is ok. */ - ChannelAddResult(): closed_(false), full_(false) {} + ChannelAddResult() = default; + /** * Creates ChannelAddResult. * @@ -34,7 +34,7 @@ public: * * @returns true when the add operation was successful. */ - bool IsOk() const noexcept { + auto IsOk() const noexcept -> bool { return !closed_ && !full_; } @@ -52,7 +52,7 @@ public: * * @returns true when the channel was closed. */ - bool IsClosed() const noexcept { + auto IsClosed() const noexcept -> bool { return closed_; } @@ -63,13 +63,13 @@ public: * * @returns true when the channel was full. */ - bool IsFull() const noexcept { + auto IsFull() const noexcept -> bool { return full_; } private: - bool closed_; - bool full_; + bool closed_ = false; + bool full_ = false; }; /** @@ -102,31 +102,31 @@ public: // The Channel Value is only move constructible and move assignable. ChannelValue(ChannelValue&&) = default; - ChannelValue& operator=(ChannelValue&&) = default; + auto operator=(ChannelValue&&) -> ChannelValue& = default; ChannelValue(const ChannelValue&) = delete; - ChannelValue& operator=(const ChannelValue&) = delete; + auto operator=(const ChannelValue&) -> ChannelValue& = delete; operator bool() const noexcept { return HasValue(); } - bool HasValue() const noexcept { + auto HasValue() const noexcept -> bool { return value_.has_value(); } - const ValueType& Value() const { + auto Value() const -> const ValueType& { return value_.value(); } - ValueType& Value() { + auto Value() -> ValueType& { return value_.value(); } - bool IsClosed() const noexcept { + auto IsClosed() const noexcept -> bool { return closed_; } - bool IsEmpty() const noexcept { + auto IsEmpty() const noexcept -> bool { return empty_; } @@ -164,7 +164,7 @@ public: */ explicit Channel(std::size_t capacity) : closed_{false}, - capacity_{capacity} + queue_{capacity} { } @@ -177,17 +177,17 @@ public: * @param value The value is moved into the new element. * @returns false when the channel is closed, and the value could not be pushed. */ - bool Add(ValueType&& value) { + auto Add(ValueType&& value) -> bool { std::unique_lock<std::mutex> lock(mu_); writers_cv_.wait(lock, [this]() { - return queue_.size() < capacity_ || closed_; + return !queue_.IsFull() || closed_; }); if (closed_) { return false; } - queue_.push_back(std::move(value)); + queue_.PushBack(std::move(value)); lock.unlock(); readers_cv_.notify_one(); @@ -204,17 +204,17 @@ public: * @param value The value is copied and added to the channel. * @returns false when the channel is closed, and the value could not be pushed. */ - bool Add(const ValueType& value) { + auto Add(const ValueType& value) -> bool { std::unique_lock<std::mutex> lock(mu_); writers_cv_.wait(lock, [this]() { - return queue_.size() < capacity_ || closed_; + return !queue_.IsFull() || closed_; }); if (closed_) { return false; } - queue_.push_back(value); + queue_.PushBack(value); lock.unlock(); readers_cv_.notify_one(); @@ -233,16 +233,16 @@ public: * @returns a add result, that can be used to check if the operation was * successful, and what was the channel state - closed, full. */ - ChannelAddResult TryAdd(ValueType&& value) { + auto TryAdd(ValueType&& value) -> ChannelAddResult { ChannelAddResult result; { std::lock_guard<std::mutex> lock(mu_); - result = ChannelAddResult{closed_, queue_.size() >= capacity_}; + result = ChannelAddResult{closed_, queue_.IsFull()}; if (!result) { return result; } - queue_.push_back(std::move(value)); + queue_.PushBack(std::move(value)); } readers_cv_.notify_one(); @@ -261,16 +261,16 @@ public: * @returns a add result, that can be used to check if the operation was * successful, and what was the channel state - closed, full. */ - ChannelAddResult TryAdd(const ValueType& value) { + auto TryAdd(const ValueType& value) -> ChannelAddResult { ChannelAddResult result; { std::lock_guard<std::mutex> lock(mu_); - result = ChannelAddResult{closed_, queue_.size() >= capacity_}; + result = ChannelAddResult{closed_, queue_.IsFull()}; if (!result) { return result; } - queue_.push_back(value); + queue_.PushBack(value); } readers_cv_.notify_one(); @@ -280,30 +280,31 @@ public: /** * Get retrieves a value from the channel. - * If the channel is empty, this call blocks until there is something added - * into the channel or the channel is empty but closed. + * + * If the channel is empty, this call blocks until there is something added into the channel or + * the channel is empty but closed. * * @returns the retrieved value. * The retrieved value has optional semantics. * One must check if the value exists before using it. * See the ChannelValue class docs. */ - ChannelValue<ValueType> Get() { + auto Get() -> ChannelValue<ValueType> { std::unique_lock<std::mutex> lock(mu_); readers_cv_.wait(lock, [this]() { - return queue_.size() > 0 || closed_; + return !queue_.IsEmpty() || closed_; }); - if (queue_.size() == 0) { - return ChannelValue<ValueType>{closed_, queue_.size() == 0}; + if (queue_.IsEmpty()) { + return ChannelValue<ValueType>{closed_, true}; } auto result = ChannelValue<ValueType>{ - std::move(queue_.front()), + std::move(queue_.Front()), closed_, - queue_.size() == 0, + queue_.IsEmpty(), }; - queue_.pop_front(); + queue_.PopFront(); lock.unlock(); writers_cv_.notify_one(); @@ -339,16 +340,10 @@ private: // signify if the channel is closed. bool closed_; - // maximum number of elements buffered inside the channel. - std::size_t capacity_; - // the internal queue used by the channel. - // TODO: replace the deque with circular buffer since the channel's capacity - // is fixed and known during the construction time. - std::deque<ValueType> queue_; + container::BoundedQueue<ValueType> queue_; }; -} -} +} // namespace vereign::sync -#endif +#endif // __VEREIGN_SYNC_CHANNEL_HH diff --git a/cpp/tests/vereign/CMakeLists.txt b/cpp/tests/vereign/CMakeLists.txt index b7c43c069858265bed7a23715e8832f9b4505105..913e3148a3e61291ac1214e1bf07aa0bfdb7a10d 100644 --- a/cpp/tests/vereign/CMakeLists.txt +++ b/cpp/tests/vereign/CMakeLists.txt @@ -17,6 +17,7 @@ list(APPEND TESTS_SRC core/time_test.cc sync/channel_test.cc + container/bounded_queue_test.cc encoding/base64_test.cc encoding/hex_test.cc @@ -45,6 +46,7 @@ list(APPEND TESTS_SRC grpc/server_test.cc grpc/json/encoder_test.cc grpc/identity_api_test.cc + grpc/event_api_test.cc ) if (WIN32) diff --git a/cpp/tests/vereign/container/bounded_queue_test.cc b/cpp/tests/vereign/container/bounded_queue_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..c4aff6ddeb20233180c8e05bdb26b45b94e692b8 --- /dev/null +++ b/cpp/tests/vereign/container/bounded_queue_test.cc @@ -0,0 +1,81 @@ +#include <vereign/container/bounded_queue.hh> + +#include <catch2/catch.hpp> + +using namespace vereign; + +TEST_CASE("container::BoundedQueue", "[vereign/container]") { + // a test type, that can confirm that the queue properly calls its destructor and there + // are no memory leaks + class Value { + public: + Value(int v) : v_{std::make_unique<int>(v)} {} + + auto Get() -> int { + return *v_; + } + private: + std::unique_ptr<int> v_; + }; + + auto q = container::BoundedQueue<Value>{3}; + + CHECK(q.Size() == 0); + CHECK(q.IsEmpty() == true); + CHECK(q.IsFull() == false); + + q.PushBack(1); + CHECK(q.Size() == 1); + CHECK(q.IsEmpty() == false); + CHECK(q.IsFull() == false); + + q.PushBack(2); + CHECK(q.Size() == 2); + CHECK(q.IsEmpty() == false); + CHECK(q.IsFull() == false); + + q.PushBack(3); + CHECK(q.Size() == 3); + CHECK(q.IsEmpty() == false); + CHECK(q.IsFull() == true); + + auto v = std::move(q.Front()); + q.PopFront(); + CHECK(v.Get() == 1); + CHECK(q.Size() == 2); + CHECK(q.IsEmpty() == false); + CHECK(q.IsFull() == false); + + v = std::move(q.Front()); + q.PopFront(); + CHECK(v.Get() == 2); + CHECK(q.Size() == 1); + CHECK(q.IsEmpty() == false); + CHECK(q.IsFull() == false); + + q.PushBack(4); + CHECK(q.Size() == 2); + CHECK(q.IsEmpty() == false); + CHECK(q.IsFull() == false); + + v = std::move(q.Front()); + q.PopFront(); + CHECK(v.Get() == 3); + CHECK(q.Size() == 1); + CHECK(q.IsEmpty() == false); + CHECK(q.IsFull() == false); + + v = std::move(q.Front()); + q.PopFront(); + CHECK(v.Get() == 4); + CHECK(q.Size() == 0); + CHECK(q.IsEmpty() == true); + CHECK(q.IsFull() == false); + + // leave something in the queue, for memory sanitizer check of the queue destruction + q.PushBack(5); + CHECK(q.Size() == 1); + CHECK(q.IsEmpty() == false); + CHECK(q.IsFull() == false); +} + diff --git a/cpp/tests/vereign/grpc/event_api_test.cc b/cpp/tests/vereign/grpc/event_api_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..a1220024fe11cb51c2af899fc2779420e178f928 --- /dev/null +++ b/cpp/tests/vereign/grpc/event_api_test.cc @@ -0,0 +1,96 @@ +#include <vereign/grpc/server.hh> + +#include <vereign/kvstore/sqlite_storage.hh> +#include <vereign/grpc/error_code.hh> +#include <vereign/core/scope_guard.hh> +#include <vereign/client_library/types.gen.pb.h> +#include <vereign/client_library/passport_api.gen.grpc.pb.h> +#include <vereign/client_library/identity_api.gen.grpc.pb.h> +#include <vereign/client_library/event_api.gen.grpc.pb.h> +#include <vereign/service/identity_service.hh> +#include <vereign/fs/util.hh> +#include <vereign/fs/path.hh> +#include <vereign/test/device.hh> +#include <vereign/test/service_context.hh> + +#ifdef _WIN32 +# include <vereign/ncrypt/rsa.hh> +#endif + +#include <testutil/env.hh> +#include <testutil/protobuf.hh> +#include <grpcpp/create_channel.h> + +#include <catch2/catch.hpp> + +TEST_CASE("grpc::EventAPI::GetNewEvents", "[vereign/grpc][.integration]") { + SECTION("get events while not logged in") { + auto public_key = vereign::testutil::RequireEnv("TEST_VEREIGN_PUB_KEY"); + auto host = vereign::testutil::RequireEnv("TEST_VEREIGN_API_HOST"); + auto port = vereign::testutil::GetEnv("TEST_VEREIGN_API_PORT", "https"); + + const std::string pin = "foo"; + auto storage_path = vereign::fs::TempDir("test_db_"); + auto rm_storage_path = vereign::fs::RemoveAllGuard(storage_path); + + vereign::grpc::Server server{"localhost:", host, port, storage_path}; + auto on_exit = vereign::core::MakeScopeGuard([&server] { + server.Shutdown(); + }); + + auto channel = ::grpc::CreateChannel( + "localhost:" + std::to_string(server.SelectedPort()), + ::grpc::InsecureChannelCredentials() + ); + + // start listening for events, and exit when the device is confirmed + std::thread t1([&channel]() { + auto event_client = vereign::client_library::EventAPI::NewStub(channel); + auto req = vereign::client_library::GetNewEventsForm{}; + auto ctx = ::grpc::ClientContext{}; + auto resp_stream = event_client->GetNewEvents(&ctx, req); + + auto resp = vereign::client_library::GetNewEventsFormResponse{}; + for (;;) { + resp.Clear(); + auto ok = resp_stream->Read(&resp); + if (!ok) { + break; + } + + REQUIRE(resp.data().size() > 0); + for (auto& event : resp.data()) { + if (event.type() == "DeviceConfirmed") { + return; + } + } + + // std::cout << vereign::testutil::ProtobufToJson(resp) << std::endl; + } + }); + + auto identity_client = vereign::client_library::IdentityAPI::NewStub(channel); + auto login_req = vereign::client_library::LoginFormNewDevice{}; + auto login_resp = vereign::client_library::LoginFormNewDeviceResponse{}; + login_req.set_pin(pin); + + ::grpc::ClientContext login_ctx; + identity_client->LoginWithNewDevice(&login_ctx, login_req, &login_resp); + + CHECK(login_resp.error() == ""); + CHECK(login_resp.status() == "OK"); + REQUIRE(login_resp.code() == "200"); + + // the old device is used for new device confirmation and authorization + auto old_storage_path = vereign::fs::TempFilePath("test_db_"); + auto rm_old_storage_path = vereign::fs::RemoveFileGuard{old_storage_path}; + auto old_device_ctx = vereign::test::ServiceContext{host, port, old_storage_path}; + auto old_device = vereign::test::Device{old_device_ctx}; + old_device.Login(public_key); + + // confirm the new device using an old device + old_device.ConfirmNewDevice(login_resp.data().qrcode(), login_resp.data().actionid()); + + t1.join(); + } +} diff --git a/cpp/tests/vereign/restapi/client_session_test.cc b/cpp/tests/vereign/restapi/client_session_test.cc index 88b361497a33b2c64e7e51677b12faa1fcf537b5..1730d834665459f72009e2b08e1ce702fa24f7cc 100644 --- a/cpp/tests/vereign/restapi/client_session_test.cc +++ b/cpp/tests/vereign/restapi/client_session_test.cc @@ -24,7 +24,7 @@ TEST_CASE("restapi::ClientSession::Post", "[vereign/restapi/client_session][.int vereign::restapi::Client client{ioc, ctx, host, port}; vereign::restapi::ClientSession client_session{client}; - client_session.SetPubKey(publicKey); + client_session.SetPubKey(publicKey, ""); std::thread ioc_thread([&ioc]{ ioc.run();