Skip to content

Instantly share code, notes, and snippets.

@cjxgm
Last active August 29, 2015 14:05
Show Gist options
  • Save cjxgm/f0dc4c6a8aaaed21ab2d to your computer and use it in GitHub Desktop.
Save cjxgm/f0dc4c6a8aaaed21ab2d to your computer and use it in GitHub Desktop.
an inter-thread multiple-writer single-reader reliable communication channel
#include <iostream>
#include <string>
#include <functional>
#include <vector>
#include <type_traits>
#include <utility>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <list>
#include <memory>
using std::cin;
using std::cout;
using std::endl;
struct channel
{
template <class Event>
using listener_t = std::function<void(Event)>;
using listener_no_arg_t = std::function<void()>;
struct event
{
event() = default;
event(const event&) = default;
event(event&&) = default;
event& operator=(const event&) = default;
event& operator=(event&&) = default;
virtual ~event() = default;
};
using event_ptr = std::shared_ptr<event>;
using event_list = std::list<event_ptr>;
using listener_wrapper = std::function<bool(event*)>;
using listener_wrapper_list = std::vector<listener_wrapper>;
using condv = std::condition_variable;
using mutex = std::mutex;
using lock = std::unique_lock<mutex>;
template <class Event>
void listen(listener_t<Event> lsnr)
{
static_assert(std::is_base_of<event, Event>(),
"Event must be a sub-class of channel::event");
lock _(m);
listeners.push_back([lsnr](event* ev) {
auto p = dynamic_cast<Event*>(ev);
if (!p) return false;
lsnr(std::move(*p));
return true;
});
}
template <class Event>
void listen(listener_no_arg_t lsnr)
{
listen<Event>([lsnr](Event) { lsnr(); });
}
template <class Event>
void post(Event ev)
{
static_assert(std::is_base_of<event, Event>(),
"Event must be a sub-class of channel::event");
{
lock _(m);
events.push_back(std::make_shared<Event>(std::move(ev)));
}
cv.notify_one();
}
void wait()
{
event_ptr ev;
{
lock lck(m);
cv.wait(lck, [this] { return !events.empty(); });
ev = events.front();
events.pop_front();
}
for (auto& lsnr: listeners)
if (lsnr(ev.get()))
return;
throw *ev.get();
}
private:
condv cv;
mutex m;
listener_wrapper_list listeners;
event_list events;
};
struct inspector
{
inspector() { id = uid++; info("ctor"); }
~inspector() { info("dtor"); }
inspector(const inspector& ) { id = uid++; info("ctor copy"); }
inspector( inspector&&) { id = uid++; info("ctor move"); }
inspector& operator=(const inspector& ) { info("asgn copy"); return *this; }
inspector& operator=( inspector&&) { info("asgn move"); return *this; }
private:
using unique_id = int;
static unique_id uid;
unique_id id;
void info(std::string msg)
{
constexpr const char* color_istr = "\e[0;35m";
constexpr const char* color_id = "\e[1;32m";
constexpr const char* color_msg = "\e[0;34m";
constexpr const char* no_color = "\e[0m";
std::cout
<< color_istr << "inspector["
<< color_id << id
<< color_istr << "]: "
<< color_msg << msg
<< no_color << std::endl;
}
};
inspector::unique_id inspector::uid;
int main()
{
struct echo_event : channel::event
{
inspector _;
std::string s;
echo_event(std::string s) : s{s} {}
};
struct stop_event : channel::event
{
inspector _;
};
channel chn;
std::thread th_read([&] {
bool cont = true;
chn.listen<echo_event>([](echo_event ev) {
cout << "received: " << ev.s << endl;
});
chn.listen<stop_event>([&] {
cout << "received stop" << endl;
cont = false;
});
while (cont) chn.wait();
});
std::thread th_write([&] {
while (true) {
std::string cmd;
cout << "channel$ ";
if (!(cin >> cmd)) break;
if (cmd == "q" || cmd == "quit") break;
chn.post(echo_event{cmd});
}
chn.post(stop_event{});
});
th_read.join();
th_write.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment