Last active
August 29, 2015 14:05
-
-
Save cjxgm/f0dc4c6a8aaaed21ab2d to your computer and use it in GitHub Desktop.
an inter-thread multiple-writer single-reader reliable communication channel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <string> | |
#include <functional> | |
#include <vector> | |
#include <type_traits> | |
#include <utility> | |
#include <mutex> | |
#include <condition_variable> | |
#include <thread> | |
#include <list> | |
#include <memory> | |
using std::cin; | |
using std::cout; | |
using std::endl; | |
struct channel | |
{ | |
template <class Event> | |
using listener_t = std::function<void(Event)>; | |
using listener_no_arg_t = std::function<void()>; | |
struct event | |
{ | |
event() = default; | |
event(const event&) = default; | |
event(event&&) = default; | |
event& operator=(const event&) = default; | |
event& operator=(event&&) = default; | |
virtual ~event() = default; | |
}; | |
using event_ptr = std::shared_ptr<event>; | |
using event_list = std::list<event_ptr>; | |
using listener_wrapper = std::function<bool(event*)>; | |
using listener_wrapper_list = std::vector<listener_wrapper>; | |
using condv = std::condition_variable; | |
using mutex = std::mutex; | |
using lock = std::unique_lock<mutex>; | |
template <class Event> | |
void listen(listener_t<Event> lsnr) | |
{ | |
static_assert(std::is_base_of<event, Event>(), | |
"Event must be a sub-class of channel::event"); | |
lock _(m); | |
listeners.push_back([lsnr](event* ev) { | |
auto p = dynamic_cast<Event*>(ev); | |
if (!p) return false; | |
lsnr(std::move(*p)); | |
return true; | |
}); | |
} | |
template <class Event> | |
void listen(listener_no_arg_t lsnr) | |
{ | |
listen<Event>([lsnr](Event) { lsnr(); }); | |
} | |
template <class Event> | |
void post(Event ev) | |
{ | |
static_assert(std::is_base_of<event, Event>(), | |
"Event must be a sub-class of channel::event"); | |
{ | |
lock _(m); | |
events.push_back(std::make_shared<Event>(std::move(ev))); | |
} | |
cv.notify_one(); | |
} | |
void wait() | |
{ | |
event_ptr ev; | |
{ | |
lock lck(m); | |
cv.wait(lck, [this] { return !events.empty(); }); | |
ev = events.front(); | |
events.pop_front(); | |
} | |
for (auto& lsnr: listeners) | |
if (lsnr(ev.get())) | |
return; | |
throw *ev.get(); | |
} | |
private: | |
condv cv; | |
mutex m; | |
listener_wrapper_list listeners; | |
event_list events; | |
}; | |
struct inspector | |
{ | |
inspector() { id = uid++; info("ctor"); } | |
~inspector() { info("dtor"); } | |
inspector(const inspector& ) { id = uid++; info("ctor copy"); } | |
inspector( inspector&&) { id = uid++; info("ctor move"); } | |
inspector& operator=(const inspector& ) { info("asgn copy"); return *this; } | |
inspector& operator=( inspector&&) { info("asgn move"); return *this; } | |
private: | |
using unique_id = int; | |
static unique_id uid; | |
unique_id id; | |
void info(std::string msg) | |
{ | |
constexpr const char* color_istr = "\e[0;35m"; | |
constexpr const char* color_id = "\e[1;32m"; | |
constexpr const char* color_msg = "\e[0;34m"; | |
constexpr const char* no_color = "\e[0m"; | |
std::cout | |
<< color_istr << "inspector[" | |
<< color_id << id | |
<< color_istr << "]: " | |
<< color_msg << msg | |
<< no_color << std::endl; | |
} | |
}; | |
inspector::unique_id inspector::uid; | |
int main() | |
{ | |
struct echo_event : channel::event | |
{ | |
inspector _; | |
std::string s; | |
echo_event(std::string s) : s{s} {} | |
}; | |
struct stop_event : channel::event | |
{ | |
inspector _; | |
}; | |
channel chn; | |
std::thread th_read([&] { | |
bool cont = true; | |
chn.listen<echo_event>([](echo_event ev) { | |
cout << "received: " << ev.s << endl; | |
}); | |
chn.listen<stop_event>([&] { | |
cout << "received stop" << endl; | |
cont = false; | |
}); | |
while (cont) chn.wait(); | |
}); | |
std::thread th_write([&] { | |
while (true) { | |
std::string cmd; | |
cout << "channel$ "; | |
if (!(cin >> cmd)) break; | |
if (cmd == "q" || cmd == "quit") break; | |
chn.post(echo_event{cmd}); | |
} | |
chn.post(stop_event{}); | |
}); | |
th_read.join(); | |
th_write.join(); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment