Skip to content

Instantly share code, notes, and snippets.

@cjxgm
Created December 20, 2014 14:58
Show Gist options
  • Save cjxgm/d7dd29b70d06aa67876e to your computer and use it in GitHub Desktop.
Save cjxgm/d7dd29b70d06aa67876e to your computer and use it in GitHub Desktop.
single rt-safe reader, single rt-UNSAFE writer communication channel, for homogeneous type
#include <iostream>
using std::cin;
using std::cout;
using std::cerr;
using std::endl;
#include <cmath>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <chrono>
namespace realtime
{
// realtime::communicator is a single-typed data communication channel
// from non-rt thread to rt thread
// single reader, single writer
// reader side is rt-safe
// writer side is rt-UNSAFE
template <class T, int Capacity=32>
struct ringbuffer
{
using value_type = T;
static constexpr auto capacity() { return Capacity; }
using size_type = std::atomic<size_t>;
private:
size_type used{0};
value_type values[capacity()];
int head{0};
int tail{0};
public:
auto size() const { return used.load(); }
auto full() const { return (size() == capacity()); }
auto empty() const { return !size(); }
operator bool () const { return !full(); }
auto push(value_type x)
{
values[tail++] = std::move(x);
if (tail == capacity()) tail = 0;
++used;
}
auto pop()
{
value_type x = std::move(values[head++]);
if (head == capacity()) head = 0;
used--;
return std::move(x);
}
};
template <class T, int Capacity=32>
struct communicator
{
using value_type = T;
static constexpr auto capacity() { return Capacity; }
using mutex = std::mutex;
using ulock = std::unique_lock<mutex>;
using condv = std::condition_variable;
using thread = std::thread;
using tank = std::queue<value_type>;
using buffer = ringbuffer<T, capacity()>;
private:
tank tk;
buffer buf;
std::atomic<typename tank::size_type> tk_size;
thread scheduler;
mutex m;
condv cv;
std::atomic_bool cont{true};
public:
communicator()
{
scheduler = thread{[this] {
while (true) {
value_type x;
{
ulock lck(m);
cv.wait(lck, [this]{ return (!cont || (tank_size() && buf)); });
if (cont) cout << "wake up: " << tank_size() << " -> " << size() << endl;
else cout << "wake up: exit" << endl;
if (!cont) break;
x = tk.front();
tk.pop();
tk_size = tk.size();
}
buf.push(std::move(x));
}
}};
}
~communicator()
{
cont = false;
if (scheduler.joinable()) {
{
ulock _(m);
cv.notify_one();
}
scheduler.join();
}
}
auto tank_size() const { return tk.size(); }
auto tank_size__rt_safe() const { return tk_size.load(); }
auto size() const { return buf.size(); }
auto empty() const { return buf.empty(); }
operator bool () const { return !empty(); }
// for non-rt thread
void push(value_type x)
{
ulock _(m);
tk.push(std::move(x));
tk_size = tk.size();
cv.notify_one();
}
// for rt_thread
auto pop()
{
auto x = buf.pop();
cv.notify_one();
return std::move(x);
}
};
}
void sleep(int ms)
{
using namespace std::chrono;
std::this_thread::sleep_for(milliseconds{ms});
}
int main()
{
realtime::communicator<int, 8> comm;
std::atomic_bool cont{true};
std::thread writer{[&cont, &comm] {
int i = 0;
while (cont) {
comm.push(i++);
sleep(std::rand() % 300);
}
}};
std::thread reader{[&cont, &comm] {
while (cont) {
int t = 100+1000 * std::pow(0.8, comm.size() + comm.tank_size__rt_safe());
cout << "sleep for " << t << "ms" << endl;
sleep(t);
if (comm) {
auto i = comm.pop();
cout << "got " << i << endl;
}
}
}};
cout << "started" << endl;
std::string whatever;
while (cin >> whatever) {}
cout << "exiting..." << endl;
cont = false;
if (writer.joinable()) writer.join();
if (reader.joinable()) reader.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment