Skip to content

Instantly share code, notes, and snippets.

@ramunas
Created November 20, 2019 22:31
Show Gist options
  • Save ramunas/115d5107bf32df1ff49cae1c2cf77c55 to your computer and use it in GitHub Desktop.
Save ramunas/115d5107bf32df1ff49cae1c2cf77c55 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
using namespace std;
template<typename T>
class channel
{
private:
T payload;
bool data_available;
bool ack;
mutex sender_mutex;
mutex receiver_mutex;
mutex synch;
condition_variable data_availability;
thread::id sender_id;
public:
channel() : data_available(false), ack(false)
{
sender_id = this_thread::get_id();
}
void send(T d)
{
unique_lock<mutex> sl(sender_mutex); // only one sender at a time
unique_lock<mutex> dl(synch);
data_availability.wait(dl, [this]() { return !data_available; }); // wait until data is consumed
payload = d;
data_available = true;
sender_id = this_thread::get_id(); // note the thead id for receiver to avoid receiving from the same thread
data_availability.notify_all(); // data available
data_availability.wait(dl, [this]() { return ack; }); // wait for acknowledgement
ack = false; // reset ack
}
T recv() {
T d;
unique_lock<mutex> rl(receiver_mutex); // one receiver at a time
// don't receive from oneself
data_availability.wait(rl, [this]() { return this_thread::get_id() != sender_id; });
unique_lock<mutex> dl(synch);
data_availability.wait(dl, [this]() { return data_available ; }); // wait until data is available
d = payload;
ack = true;
data_available = false; // consume data
data_availability.notify_all(); // send acknowledgement
return d;
}
};
int main (int argc, char **argv)
{
channel<int> c;
thread t1([&c]() {
// cout << "Sending 11 " << this_thread::get_id() << endl;
// c.send(11);
// cout << "Receive 1" << endl;
// auto x = c.recv();
// cout << "Receive 1" << endl;
// auto y = c.recv();
for (int i = 0; i < 200; i++) {
auto x = c.recv();
cout << "--------------------Received " << x << endl;
}
});
thread t2([&c]() {
// cout << "Receive 2" << endl;
// auto x = c.recv();
// cout << "Sending 22" << endl;
// c.send(22);
for (int i = 0; i < 125; i++) {
c.send(33);
}
});
thread t3([&c]() {
// cout << "Sending 33" << endl;
// c.send(33);
// cout << "33 terminated" << endl;
for (int i = 0; i < 200-125; i++) {
c.send(44);
}
});
t1.join();
t2.join();
t3.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment