Skip to content
Snippets Groups Projects
client.cc 5.62 KiB
Newer Older
  • Learn to ignore specific revisions
  • #include "vereign/restapi/detail/post_task.hh"
    #include <chrono>
    #include <vereign/restapi/client.hh>
    
    #include <vereign/restapi/detail/http_reader.hh>
    
    #include <boost/asio/strand.hpp>
    #include <boost/beast/ssl.hpp>
    #include <boost/asio/ip/tcp.hpp>
    #include <boost/beast/version.hpp>
    #include <boost/asio/dispatch.hpp>
    
    namespace vereign {
    namespace restapi {
    
    constexpr auto defaultTimeout = std::chrono::seconds(30);
    
    Client::Client(
      boost::asio::io_context& ioc,
      boost::asio::ssl::context& ssl_ctx,
      const std::string& host,
      const std::string& port
    )
      : executor_{asio::make_strand(ioc)},
        ssl_ctx_{ssl_ctx},
        resolver_{ioc},
        reader_{nullptr},
        host_{host},
        port_{port},
        expiry_time_{defaultTimeout},
        connecting_{false},
        writing_{false},
        reading_{false},
        closed_{false}
    {
    }
    
    Client::Client(
      boost::asio::io_context& ioc,
      boost::asio::ssl::context& ssl_ctx,
      const std::string& host,
      const std::string& port,
      std::chrono::nanoseconds expiry_time
    )
      : executor_{asio::make_strand(ioc)},
        ssl_ctx_{ssl_ctx},
        resolver_{ioc},
        reader_{nullptr},
        host_{host},
        port_{port},
        expiry_time_{expiry_time},
        connecting_{false},
        writing_{false},
        reading_{false},
        closed_{false}
    {
    }
    
    Client::~Client() noexcept {
      Close();
    }
    
    void Client::Close() {
      asio::post(
        executor_,
        [this]() {
          if (closed_) {
            return;
          }
          closed_ = true;
    
          if (stream_) {
            beast::get_lowest_layer(*stream_).close();
            stream_.reset();
          }
        }
      );
    }
    
    const asio::executor& Client::GetExecutor() const {
      return executor_;
    }
    
    void Client::addPostTask(detail::PostTaskBasePtr&& task) {
      asio::post(
        executor_,
        [this, task = std::move(task)]() mutable {
          if (closed_) {
            task->Complete(detail::PostError{"client closed"});
            return;
          }
    
          task_queue_.push_back(std::move(task));
          if (task_queue_.size() == 1) {
            doPost();
          }
        });
    }
    
    void Client::completeTask(
      const boost::optional<detail::HttpResponse>& resp,
      const detail::PostError& err
    ) {
      if (task_queue_.empty()) {
        return;
      }
    
      auto task = std::move(task_queue_.front());
      task_queue_.pop_front();
      if (!err) {
        task->DecodeResponse(*resp);
      }
    
      asio::dispatch(executor_, [task = std::move(task), err]() mutable {
        task->Complete(err);
      });
    }
    
    void Client::handleError(const detail::PostError& err) {
      completeTask(boost::none, err);
      resetStream();
    
      if (task_queue_.size() > 0) {
        asio::post(
          executor_,
          [this]() {
            doPost();
          });
      }
    }
    
    void Client::resolve() {
      resolver_.async_resolve(
        host_,
        port_,
        [this](beast::error_code ec, tcp::resolver::results_type results) {
          if (ec) {
            connecting_ = false;
            handleError(ec.message());
            return;
          }
    
          connect(results);
        });
    }
    
    void Client::connect(tcp::resolver::results_type results) {
      stream_ = std::make_shared<streamType>(executor_, ssl_ctx_);
      reader_ = std::make_shared<detail::HttpReader>(stream_, expiry_time_);
    
      using endpoint_t = tcp::resolver::results_type::endpoint_type;
      beast::get_lowest_layer(*stream_).expires_after(expiry_time_);
      auto stream = stream_;
      beast::get_lowest_layer(*stream_).async_connect(
        results,
        [this, stream](beast::error_code ec, endpoint_t) {
          if (ec) {
            connecting_ = false;
            handleError(ec.message());
            return;
          }
    
          handshake();
        }
      );
    }
    
    void Client::handshake() {
      auto stream = stream_;
      stream_->async_handshake(
        asio::ssl::stream_base::client,
        [this, stream](beast::error_code ec) {
          connecting_ = false;
          if (ec) {
            handleError(ec.message());
            return;
          }
    
          doPost();
        }
      );
    }
    
    void Client::cancelAllTasks(const detail::PostError& err) {
      while (task_queue_.size() > 0) {
        auto task = std::move(task_queue_.front());
        task_queue_.pop_front();
    
        task->Complete(err);
      }
    }
    
    void Client::doPost() {
      if (writing_ || connecting_ || task_queue_.size() == 0) {
        return;
      }
    
      if (closed_) {
        cancelAllTasks(detail::PostError("client closed"));
    
        return;
      }
    
      if (!stream_ || reader_->IsClosed()) {
        connecting_ = true;
        resolve();
        return;
      }
    
      auto& task = task_queue_.front();
      req_.emplace();
      req_->method(beast::http::verb::post);
      req_->version(11);
      req_->target(task->Path());
      req_->set(beast::http::field::host, host_);
      req_->set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING);
      req_->set(beast::http::field::content_type, "application/json");
    
      auto err = task->EncodeRequest(*req_);
      if (err) {
        handleError(err);
        return;
      }
    
      req_->prepare_payload();
    
      auto stream = stream_;
      writing_ = true;
      beast::get_lowest_layer(*stream_).expires_after(expiry_time_);
      beast::http::async_write(
        *stream_,
        *req_,
        [this, stream](beast::error_code ec, std::size_t bytes_transferred) {
          writing_ = false;
          boost::ignore_unused(bytes_transferred);
    
          if (ec) {
            handleError(ec.message());
            return;
          }
    
          readResponse();
        });
    }
    
    void Client::resetStream() {
      if (!stream_) {
        return;
      }
    
      beast::get_lowest_layer(*stream_).close();
      stream_.reset();
    }
    
    void Client::readResponse() {
      if (reading_) {
        return;
      }
    
      reader_->AsyncRead(
        [this](const detail::HttpResponse& resp, beast::error_code ec) {
          reading_ = false;
    
          if (ec) {
            handleError(ec.message());
            return;
          }
    
          completeTask(resp, boost::none);
    
          if (task_queue_.size() > 0) {
            asio::post(
              executor_,
              [this]() {
                doPost();
              });
          }
        });
    }
    
    }
    }