Created
November 20, 2019 22:31
-
-
Save ramunas/115d5107bf32df1ff49cae1c2cf77c55 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <mutex> | |
#include <thread> | |
#include <condition_variable> | |
using namespace std; | |
template<typename T> | |
class channel | |
{ | |
private: | |
T payload; | |
bool data_available; | |
bool ack; | |
mutex sender_mutex; | |
mutex receiver_mutex; | |
mutex synch; | |
condition_variable data_availability; | |
thread::id sender_id; | |
public: | |
channel() : data_available(false), ack(false) | |
{ | |
sender_id = this_thread::get_id(); | |
} | |
void send(T d) | |
{ | |
unique_lock<mutex> sl(sender_mutex); // only one sender at a time | |
unique_lock<mutex> dl(synch); | |
data_availability.wait(dl, [this]() { return !data_available; }); // wait until data is consumed | |
payload = d; | |
data_available = true; | |
sender_id = this_thread::get_id(); // note the thead id for receiver to avoid receiving from the same thread | |
data_availability.notify_all(); // data available | |
data_availability.wait(dl, [this]() { return ack; }); // wait for acknowledgement | |
ack = false; // reset ack | |
} | |
T recv() { | |
T d; | |
unique_lock<mutex> rl(receiver_mutex); // one receiver at a time | |
// don't receive from oneself | |
data_availability.wait(rl, [this]() { return this_thread::get_id() != sender_id; }); | |
unique_lock<mutex> dl(synch); | |
data_availability.wait(dl, [this]() { return data_available ; }); // wait until data is available | |
d = payload; | |
ack = true; | |
data_available = false; // consume data | |
data_availability.notify_all(); // send acknowledgement | |
return d; | |
} | |
}; | |
int main (int argc, char **argv) | |
{ | |
channel<int> c; | |
thread t1([&c]() { | |
// cout << "Sending 11 " << this_thread::get_id() << endl; | |
// c.send(11); | |
// cout << "Receive 1" << endl; | |
// auto x = c.recv(); | |
// cout << "Receive 1" << endl; | |
// auto y = c.recv(); | |
for (int i = 0; i < 200; i++) { | |
auto x = c.recv(); | |
cout << "--------------------Received " << x << endl; | |
} | |
}); | |
thread t2([&c]() { | |
// cout << "Receive 2" << endl; | |
// auto x = c.recv(); | |
// cout << "Sending 22" << endl; | |
// c.send(22); | |
for (int i = 0; i < 125; i++) { | |
c.send(33); | |
} | |
}); | |
thread t3([&c]() { | |
// cout << "Sending 33" << endl; | |
// c.send(33); | |
// cout << "33 terminated" << endl; | |
for (int i = 0; i < 200-125; i++) { | |
c.send(44); | |
} | |
}); | |
t1.join(); | |
t2.join(); | |
t3.join(); | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment