Skip to content
Snippets Groups Projects
channel_test.cc 4.56 KiB
Newer Older
  • Learn to ignore specific revisions
  • #include <vereign/sync/channel.hh>
    #include <thread>
    
    #include "catch.hpp"
    
    TEST_CASE("Channel::Channel", "[vereign/sync/channel]") {
      vereign::sync::Channel<std::string>{10};
      vereign::sync::Channel<std::unique_ptr<std::string>>{10};
    }
    
    TEST_CASE("Channel general usage", "[vereign/sync/channel]") {
      vereign::sync::Channel<std::string> ch{10};
      std::vector<std::string> input;
      for (int i = 0; i < 100; i++) {
        input.push_back(std::to_string(i));
      }
    
      std::thread t1{[&ch, &input]() {
        for (auto &s : input) {
          ch.Add(std::string(s));
        }
        ch.Close();
      }};
    
      std::vector<std::string> messages;
      std::thread t2{[&ch, &messages]() {
        for (;;) {
          auto val = ch.Get();
          if (val) {
            messages.push_back(val.Value());
          }
    
          if (val.IsClosed() && val.IsEmpty()) {
            return;
          }
        }
      }};
    
      t1.join();
      t2.join();
    
      REQUIRE_THAT(messages, Catch::Equals(input));
    }
    
    TEST_CASE("Drain closed channel", "[vereign/sync/channel]") {
      vereign::sync::Channel<std::string> ch{10};
      std::vector<std::string> input;
      for (int i = 0; i < 10; i++) {
        ch.Add(std::to_string(i));
      }
      ch.Close();
    
      std::vector<std::string> messages;
      for (;;) {
        auto val = ch.Get();
        if (val) {
          messages.push_back(val.Value());
        }
    
        if (val.IsClosed() && val.IsEmpty()) {
          return;
        }
      }
    
      REQUIRE_THAT(messages, Catch::Equals(input));
    }
    
    TEST_CASE("Multiple consumers", "[vereign/sync/channel]") {
      vereign::sync::Channel<int> ch{10};
      std::vector<int> input;
      for (int i = 0; i < 100; i++) {
        input.push_back(i);
      }
    
      std::thread t1{[&ch, &input]() {
        for (int val : input) {
          ch.Add(val);
        }
        ch.Close();
      }};
    
      std::mutex mu;
      std::vector<int> messages;
      auto worker = [&ch, &messages, &mu](){
        for (;;) {
          auto val = ch.Get();
          if (val) {
            std::lock_guard<std::mutex> lock{mu};
            messages.push_back(val.Value());
          }
    
          if (val.IsClosed() && val.IsEmpty()) {
            return;
          }
        }
      };
    
      std::vector<std::thread> consumers;
      for (int i = 0; i < 10; i++) {
        consumers.emplace_back(std::thread{worker});
      }
    
      t1.join();
      for (auto& th: consumers) {
        th.join();
      }
    
      std::sort(messages.begin(), messages.end());
    
      REQUIRE_THAT(messages, Catch::Equals(input));
    }
    
    TEST_CASE("Unblock all consumers on close", "[vereign/sync/channel]") {
      vereign::sync::Channel<int> ch{10};
    
      auto worker = [&ch](){
        for (;;) {
          auto val = ch.Get();
          if (val) {
          }
    
          if (val.IsClosed() && val.IsEmpty()) {
            return;
          }
        }
      };
    
      std::vector<std::thread> consumers;
      for (int i = 0; i < 10; i++) {
        consumers.emplace_back(std::thread{worker});
      }
    
      std::thread t1{[&ch]() {
        ch.Close();
      }};
      t1.join();
    
      for (auto& th: consumers) {
        th.join();
      }
    }
    
    TEST_CASE("Channel::TryAdd const reference", "[vereign/sync/channel]") {
      int value;
      vereign::sync::Channel<int> ch{2};
      auto result = ch.TryAdd(value);
      CHECK(result);
      CHECK(result.IsOk());
      CHECK_FALSE(result.IsClosed());
      CHECK_FALSE(result.IsFull());
    
      result = ch.TryAdd(value);
      CHECK(result);
      CHECK(result.IsOk());
      CHECK_FALSE(result.IsClosed());
      CHECK_FALSE(result.IsFull());
    
      result = ch.TryAdd(value);
      CHECK_FALSE(result);
      CHECK_FALSE(result.IsOk());
      CHECK_FALSE(result.IsClosed());
      CHECK(result.IsFull());
    
      ch.Close();
      result = ch.TryAdd(value);
      CHECK_FALSE(result);
      CHECK_FALSE(result.IsOk());
      CHECK(result.IsClosed());
      CHECK(result.IsFull());
    
      vereign::sync::Channel<int> closed_ch{2};
      closed_ch.Close();
      result = closed_ch.TryAdd(value);
      CHECK_FALSE(result);
      CHECK_FALSE(result.IsOk());
      CHECK(result.IsClosed());
      CHECK_FALSE(result.IsFull());
    }
    
    TEST_CASE("Channel::TryAdd rvalue reference", "[vereign/sync/channel]") {
      vereign::sync::Channel<std::string> ch{2};
      auto result = ch.TryAdd("foo");
      CHECK(result);
      CHECK(result.IsOk());
      CHECK_FALSE(result.IsClosed());
      CHECK_FALSE(result.IsFull());
    
      result = ch.TryAdd("bar");
      CHECK(result);
      CHECK(result.IsOk());
      CHECK_FALSE(result.IsClosed());
      CHECK_FALSE(result.IsFull());
    
      result = ch.TryAdd("baz");
      CHECK_FALSE(result);
      CHECK_FALSE(result.IsOk());
      CHECK_FALSE(result.IsClosed());
      CHECK(result.IsFull());
    
      ch.Close();
      result = ch.TryAdd("baz");
      CHECK_FALSE(result);
      CHECK_FALSE(result.IsOk());
      CHECK(result.IsClosed());
      CHECK(result.IsFull());
    
      vereign::sync::Channel<std::string> closed_ch{2};
      closed_ch.Close();
      result = closed_ch.TryAdd("foo");
      CHECK_FALSE(result);
      CHECK_FALSE(result.IsOk());
      CHECK(result.IsClosed());
      CHECK_FALSE(result.IsFull());
    }