Created
December 20, 2014 14:58
-
-
Save cjxgm/d7dd29b70d06aa67876e to your computer and use it in GitHub Desktop.
single rt-safe reader, single rt-UNSAFE writer communication channel, for homogeneous type
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
using std::cin; | |
using std::cout; | |
using std::cerr; | |
using std::endl; | |
#include <cmath> | |
#include <cstdlib> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
#include <queue> | |
#include <atomic> | |
#include <chrono> | |
namespace realtime | |
{ | |
// realtime::communicator is a single-typed data communication channel | |
// from non-rt thread to rt thread | |
// single reader, single writer | |
// reader side is rt-safe | |
// writer side is rt-UNSAFE | |
template <class T, int Capacity=32> | |
struct ringbuffer | |
{ | |
using value_type = T; | |
static constexpr auto capacity() { return Capacity; } | |
using size_type = std::atomic<size_t>; | |
private: | |
size_type used{0}; | |
value_type values[capacity()]; | |
int head{0}; | |
int tail{0}; | |
public: | |
auto size() const { return used.load(); } | |
auto full() const { return (size() == capacity()); } | |
auto empty() const { return !size(); } | |
operator bool () const { return !full(); } | |
auto push(value_type x) | |
{ | |
values[tail++] = std::move(x); | |
if (tail == capacity()) tail = 0; | |
++used; | |
} | |
auto pop() | |
{ | |
value_type x = std::move(values[head++]); | |
if (head == capacity()) head = 0; | |
used--; | |
return std::move(x); | |
} | |
}; | |
template <class T, int Capacity=32> | |
struct communicator | |
{ | |
using value_type = T; | |
static constexpr auto capacity() { return Capacity; } | |
using mutex = std::mutex; | |
using ulock = std::unique_lock<mutex>; | |
using condv = std::condition_variable; | |
using thread = std::thread; | |
using tank = std::queue<value_type>; | |
using buffer = ringbuffer<T, capacity()>; | |
private: | |
tank tk; | |
buffer buf; | |
std::atomic<typename tank::size_type> tk_size; | |
thread scheduler; | |
mutex m; | |
condv cv; | |
std::atomic_bool cont{true}; | |
public: | |
communicator() | |
{ | |
scheduler = thread{[this] { | |
while (true) { | |
value_type x; | |
{ | |
ulock lck(m); | |
cv.wait(lck, [this]{ return (!cont || (tank_size() && buf)); }); | |
if (cont) cout << "wake up: " << tank_size() << " -> " << size() << endl; | |
else cout << "wake up: exit" << endl; | |
if (!cont) break; | |
x = tk.front(); | |
tk.pop(); | |
tk_size = tk.size(); | |
} | |
buf.push(std::move(x)); | |
} | |
}}; | |
} | |
~communicator() | |
{ | |
cont = false; | |
if (scheduler.joinable()) { | |
{ | |
ulock _(m); | |
cv.notify_one(); | |
} | |
scheduler.join(); | |
} | |
} | |
auto tank_size() const { return tk.size(); } | |
auto tank_size__rt_safe() const { return tk_size.load(); } | |
auto size() const { return buf.size(); } | |
auto empty() const { return buf.empty(); } | |
operator bool () const { return !empty(); } | |
// for non-rt thread | |
void push(value_type x) | |
{ | |
ulock _(m); | |
tk.push(std::move(x)); | |
tk_size = tk.size(); | |
cv.notify_one(); | |
} | |
// for rt_thread | |
auto pop() | |
{ | |
auto x = buf.pop(); | |
cv.notify_one(); | |
return std::move(x); | |
} | |
}; | |
} | |
void sleep(int ms) | |
{ | |
using namespace std::chrono; | |
std::this_thread::sleep_for(milliseconds{ms}); | |
} | |
int main() | |
{ | |
realtime::communicator<int, 8> comm; | |
std::atomic_bool cont{true}; | |
std::thread writer{[&cont, &comm] { | |
int i = 0; | |
while (cont) { | |
comm.push(i++); | |
sleep(std::rand() % 300); | |
} | |
}}; | |
std::thread reader{[&cont, &comm] { | |
while (cont) { | |
int t = 100+1000 * std::pow(0.8, comm.size() + comm.tank_size__rt_safe()); | |
cout << "sleep for " << t << "ms" << endl; | |
sleep(t); | |
if (comm) { | |
auto i = comm.pop(); | |
cout << "got " << i << endl; | |
} | |
} | |
}}; | |
cout << "started" << endl; | |
std::string whatever; | |
while (cin >> whatever) {} | |
cout << "exiting..." << endl; | |
cont = false; | |
if (writer.joinable()) writer.join(); | |
if (reader.joinable()) reader.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment