Last active
June 27, 2016 11:02
-
-
Save sekia/2b5f786b67fcb64d1aed93c96a91db1c to your computer and use it in GitHub Desktop.
Concurrent program with libcoro (http://software.schmorp.de/pkg/libcoro.html)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <algorithm> | |
| #include <cerrno> | |
| #include <cstring> | |
| #include <memory> | |
| #include <queue> | |
| #include <stdexcept> | |
| #include "coro.h" | |
| class Coro { | |
| public: | |
| Coro(const Coro&) = delete; | |
| Coro& operator=(const Coro&) = delete; | |
| Coro() | |
| : context_(new coro_context), | |
| stack_(new coro_stack { .sptr = nullptr, .ssze = 0 }) { | |
| coro_create(context_.get(), nullptr, nullptr, stack_->sptr, stack_->ssze); | |
| } | |
| Coro(Coro&& other) = default; | |
| Coro(coro_func coro, void *arg = nullptr, std::size_t stack_size = 0) | |
| : context_(new coro_context), stack_(new coro_stack) { | |
| if (!coro_stack_alloc(stack_.get(), stack_size)) { | |
| char error_str[256]; | |
| strerror_r(errno, error_str, sizeof(error_str) / sizeof(error_str[0])); | |
| throw std::runtime_error(error_str); | |
| } | |
| coro_create(context_.get(), coro, arg, stack_->sptr, stack_->ssze); | |
| } | |
| ~Coro() { | |
| if (context_) { coro_destroy(context_.get()); } | |
| if (stack_) { coro_stack_free(stack_.get()); } | |
| } | |
| Coro& operator=(Coro&& other) = default; | |
| bool operator==(const Coro& other) const noexcept { | |
| return context_ == other.context_ && stack_ == other.stack_; | |
| } | |
| bool operator!=(const Coro& other) const noexcept { | |
| return !operator==(other); | |
| } | |
| coro_context& Context() { return *context_; } | |
| std::size_t StackSize() const { return stack_->ssze; } | |
| void Transfer(Coro& next) { | |
| coro_transfer(context_.get(), &next.Context()); | |
| } | |
| private: | |
| std::unique_ptr<coro_context> context_; | |
| std::unique_ptr<coro_stack> stack_; | |
| }; | |
| class Scheduler { | |
| public: | |
| class Terminator { | |
| public: | |
| ~Terminator() { Scheduler::GetInstance().Terminate(); } | |
| }; | |
| Scheduler(const Scheduler&) = delete; | |
| Scheduler(Scheduler&&) = delete; | |
| Scheduler& operator=(const Scheduler&) = delete; | |
| Scheduler& operator=(Scheduler&&) = delete; | |
| void Cede() { | |
| if (!Running()) { return; } | |
| if (queue_.size() <= 1) { return; } | |
| queue_.push(std::move(queue_.front())); | |
| queue_.pop(); | |
| queue_.back().Transfer(queue_.front()); | |
| } | |
| static Scheduler& GetInstance() { | |
| static Scheduler instance; | |
| return instance; | |
| } | |
| void Register( | |
| coro_func coro, void *arg = nullptr, std::size_t stack_size = 0) { | |
| queue_.emplace(coro, arg, stack_size); | |
| } | |
| void Run() { | |
| if (Running()) { return; } | |
| if (queue_.empty()) { return; } | |
| running_ = true; | |
| main_.Transfer(queue_.front()); | |
| } | |
| bool Running() const { return running_; } | |
| void Terminate() { | |
| if (!Running()) { return; } | |
| Coro terminated = std::move(queue_.front()); | |
| queue_.pop(); | |
| Coro& next = queue_.empty() ? main_ : queue_.front(); | |
| if (next == main_) { running_ = false; } | |
| terminated.Transfer(next); | |
| } | |
| private: | |
| Scheduler() = default; | |
| Coro main_; | |
| std::queue<Coro> queue_; | |
| bool running_ = false; | |
| }; | |
| template <typename T> | |
| class Channel { | |
| public: | |
| Channel(std::size_t capacity = 0) : capacity_(capacity) {} | |
| void Close() { closed_ = true; } | |
| bool Closed() const { return closed_; } | |
| bool Empty() const { return queue_.empty(); } | |
| T Get() { | |
| while (!Closed() && Empty()) { Scheduler::GetInstance().Cede(); } | |
| if (Closed() && Empty()) { | |
| throw std::runtime_error("Trying fetch data from closed channel."); | |
| } | |
| T value(std::move(queue_.front())); | |
| queue_.pop(); | |
| return value; | |
| } | |
| void Put(T value) { | |
| while (CapacityIsSet() && queue_.size() >= capacity_) { | |
| Scheduler::GetInstance().Cede(); | |
| } | |
| queue_.push(value); | |
| } | |
| private: | |
| bool CapacityIsSet() const { return capacity_ > 0; } | |
| bool closed_ = false; | |
| std::size_t capacity_; | |
| std::queue<T> queue_; | |
| }; | |
| #include <iostream> | |
| int main() { | |
| auto& sched = Scheduler::GetInstance(); | |
| Channel<int> ch(5); | |
| sched.Register( | |
| [](void *arg) { | |
| Scheduler::Terminator guard; | |
| auto ch = reinterpret_cast<Channel<int> *>(arg); | |
| for (int i = 0; i < 10; ++i) { | |
| ch->Put(i); | |
| std::cout << "Producer: " << i << std::endl; | |
| } | |
| ch->Close(); | |
| }, | |
| reinterpret_cast<void *>(&ch)); | |
| sched.Register( | |
| [](void *arg) { | |
| Scheduler::Terminator guard; | |
| auto ch = reinterpret_cast<Channel<int> *>(arg); | |
| while (!(ch->Closed() && ch->Empty())) { | |
| int got = ch->Get(); | |
| std::cout << "Consumer: " << got << std::endl; | |
| } | |
| }, | |
| reinterpret_cast<void *>(&ch)); | |
| sched.Run(); | |
| return 0; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment