Skip to content

Instantly share code, notes, and snippets.

@sekia
Last active June 27, 2016 11:02
Show Gist options
  • Select an option

  • Save sekia/2b5f786b67fcb64d1aed93c96a91db1c to your computer and use it in GitHub Desktop.

Select an option

Save sekia/2b5f786b67fcb64d1aed93c96a91db1c to your computer and use it in GitHub Desktop.
Concurrent program with libcoro (http://software.schmorp.de/pkg/libcoro.html)
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <memory>
#include <queue>
#include <stdexcept>
#include "coro.h"
class Coro {
public:
Coro(const Coro&) = delete;
Coro& operator=(const Coro&) = delete;
Coro()
: context_(new coro_context),
stack_(new coro_stack { .sptr = nullptr, .ssze = 0 }) {
coro_create(context_.get(), nullptr, nullptr, stack_->sptr, stack_->ssze);
}
Coro(Coro&& other) = default;
Coro(coro_func coro, void *arg = nullptr, std::size_t stack_size = 0)
: context_(new coro_context), stack_(new coro_stack) {
if (!coro_stack_alloc(stack_.get(), stack_size)) {
char error_str[256];
strerror_r(errno, error_str, sizeof(error_str) / sizeof(error_str[0]));
throw std::runtime_error(error_str);
}
coro_create(context_.get(), coro, arg, stack_->sptr, stack_->ssze);
}
~Coro() {
if (context_) { coro_destroy(context_.get()); }
if (stack_) { coro_stack_free(stack_.get()); }
}
Coro& operator=(Coro&& other) = default;
bool operator==(const Coro& other) const noexcept {
return context_ == other.context_ && stack_ == other.stack_;
}
bool operator!=(const Coro& other) const noexcept {
return !operator==(other);
}
coro_context& Context() { return *context_; }
std::size_t StackSize() const { return stack_->ssze; }
void Transfer(Coro& next) {
coro_transfer(context_.get(), &next.Context());
}
private:
std::unique_ptr<coro_context> context_;
std::unique_ptr<coro_stack> stack_;
};
class Scheduler {
public:
class Terminator {
public:
~Terminator() { Scheduler::GetInstance().Terminate(); }
};
Scheduler(const Scheduler&) = delete;
Scheduler(Scheduler&&) = delete;
Scheduler& operator=(const Scheduler&) = delete;
Scheduler& operator=(Scheduler&&) = delete;
void Cede() {
if (!Running()) { return; }
if (queue_.size() <= 1) { return; }
queue_.push(std::move(queue_.front()));
queue_.pop();
queue_.back().Transfer(queue_.front());
}
static Scheduler& GetInstance() {
static Scheduler instance;
return instance;
}
void Register(
coro_func coro, void *arg = nullptr, std::size_t stack_size = 0) {
queue_.emplace(coro, arg, stack_size);
}
void Run() {
if (Running()) { return; }
if (queue_.empty()) { return; }
running_ = true;
main_.Transfer(queue_.front());
}
bool Running() const { return running_; }
void Terminate() {
if (!Running()) { return; }
Coro terminated = std::move(queue_.front());
queue_.pop();
Coro& next = queue_.empty() ? main_ : queue_.front();
if (next == main_) { running_ = false; }
terminated.Transfer(next);
}
private:
Scheduler() = default;
Coro main_;
std::queue<Coro> queue_;
bool running_ = false;
};
template <typename T>
class Channel {
public:
Channel(std::size_t capacity = 0) : capacity_(capacity) {}
void Close() { closed_ = true; }
bool Closed() const { return closed_; }
bool Empty() const { return queue_.empty(); }
T Get() {
while (!Closed() && Empty()) { Scheduler::GetInstance().Cede(); }
if (Closed() && Empty()) {
throw std::runtime_error("Trying fetch data from closed channel.");
}
T value(std::move(queue_.front()));
queue_.pop();
return value;
}
void Put(T value) {
while (CapacityIsSet() && queue_.size() >= capacity_) {
Scheduler::GetInstance().Cede();
}
queue_.push(value);
}
private:
bool CapacityIsSet() const { return capacity_ > 0; }
bool closed_ = false;
std::size_t capacity_;
std::queue<T> queue_;
};
#include <iostream>
int main() {
auto& sched = Scheduler::GetInstance();
Channel<int> ch(5);
sched.Register(
[](void *arg) {
Scheduler::Terminator guard;
auto ch = reinterpret_cast<Channel<int> *>(arg);
for (int i = 0; i < 10; ++i) {
ch->Put(i);
std::cout << "Producer: " << i << std::endl;
}
ch->Close();
},
reinterpret_cast<void *>(&ch));
sched.Register(
[](void *arg) {
Scheduler::Terminator guard;
auto ch = reinterpret_cast<Channel<int> *>(arg);
while (!(ch->Closed() && ch->Empty())) {
int got = ch->Get();
std::cout << "Consumer: " << got << std::endl;
}
},
reinterpret_cast<void *>(&ch));
sched.Run();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment