Last active
June 9, 2021 07:02
-
-
Save sysroad/d1f004e82a15b0ae2cd9b2e75d0ecc06 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <thread> | |
#include <vector> | |
#include <map> | |
#include <functional> | |
#include <concurrent_queue.h> | |
#include <concurrent_unordered_map.h> | |
#include <shared_mutex> | |
#include <condition_variable> | |
#include <atomic> | |
#include <future> | |
using namespace std; | |
// ID 확인을 위한 인터페이스 | |
class IMessage | |
{ | |
public: | |
virtual int ID() const { return -1; } | |
}; | |
// T 타입 데이터를 참조 포인터로 갖는 메시지 | |
template<typename T> class Message : IMessage | |
{ | |
protected: | |
shared_ptr<T> data; | |
Message(shared_ptr<T> data) | |
{ | |
this->data.reset(data); | |
} | |
Message(T data) | |
{ | |
this->data.reset(new T(data)); | |
} | |
}; | |
class MsgDispatcher; | |
// 메시지를 보내고 받는 객체 베이스 | |
class MsgUser | |
{ | |
protected: | |
MsgDispatcher* dispatcher; | |
int user_id; | |
typedef function<void(IMessage*)> MsgHandler; | |
map<int, MsgHandler> msg_handlers; | |
public: | |
MsgUser(MsgDispatcher* dispatcher); | |
~MsgUser(); | |
void handle_message(IMessage* msg); | |
}; | |
// 메시지 분배기 | |
class MsgDispatcher | |
{ | |
typedef Concurrency::concurrent_queue<shared_ptr<IMessage>> MessageQue; | |
typedef Concurrency::concurrent_unordered_map<int, MsgUser*> UserMap; | |
typedef shared_mutex Lock; | |
typedef unique_lock<Lock> WriterLock; | |
typedef shared_lock<Lock> ReaderLock; | |
Lock user_que_lock; | |
mutex msg_mutex; | |
condition_variable msg_event; | |
MessageQue msgQ; | |
UserMap user_map; | |
atomic<int> id = 0; | |
public: | |
MsgDispatcher(); | |
~MsgDispatcher(); | |
int subscribe(MsgUser* user); | |
void unsubscribe(int id); | |
void put(shared_ptr<IMessage> m); | |
private: | |
void MsgProc(); | |
}; | |
class TestMsg : public Message<string> | |
{ | |
public: | |
TestMsg(string s) | |
: Message(s) | |
{ | |
} | |
static const int id = 1; | |
virtual int ID() const { return id; } | |
void print_message() | |
{ | |
cout << data.get()->c_str() << endl; | |
} | |
}; | |
MsgDispatcher::MsgDispatcher() | |
{ | |
thread([this]() { MsgProc(); }).detach(); | |
} | |
MsgDispatcher::~MsgDispatcher() | |
{ | |
/*if (user_map.empty()) | |
{ | |
cout << "all clear" << endl; | |
}*/ | |
} | |
int MsgDispatcher::subscribe(MsgUser* user) | |
{ | |
ReaderLock lock(user_que_lock); | |
int new_id = ++id; | |
user_map.insert(make_pair(new_id, user)); | |
return new_id; | |
} | |
void MsgDispatcher::unsubscribe(int id) | |
{ | |
auto find = user_map.find(id); | |
if (find != user_map.end()) | |
{ | |
WriterLock lock(user_que_lock); | |
user_map.unsafe_erase(id); | |
} | |
} | |
void MsgDispatcher::put(shared_ptr<IMessage> m) | |
{ | |
msgQ.push(move(m)); | |
msg_event.notify_one(); | |
} | |
void MsgDispatcher::MsgProc() | |
{ | |
while (true) | |
{ | |
unique_lock<mutex> lock(msg_mutex); | |
msg_event.wait(lock, [this]() -> bool { return !msgQ.empty(); }); | |
shared_ptr<IMessage> m; | |
if (msgQ.try_pop(m)) | |
{ | |
ReaderLock lock(user_que_lock); | |
for (auto iter = user_map.begin(); iter != user_map.end(); ++iter) | |
{ | |
((*iter).second)->handle_message(m.get()); | |
} | |
} | |
} | |
} | |
MsgUser::MsgUser(MsgDispatcher* dispatcher) | |
: dispatcher(dispatcher) | |
{ | |
user_id = dispatcher->subscribe(this); | |
} | |
MsgUser::~MsgUser() | |
{ | |
if (0 <= user_id) | |
dispatcher->unsubscribe(user_id); | |
} | |
void MsgUser::handle_message(IMessage* msg) | |
{ | |
auto handle = msg_handlers.find(msg->ID()); | |
if (handle != msg_handlers.end()) | |
{ | |
try | |
{ | |
(*handle).second(msg); | |
} | |
catch (const std::exception&) | |
{ | |
} | |
} | |
} | |
class Sender : public MsgUser | |
{ | |
public: | |
Sender(MsgDispatcher* dispatcher) | |
: MsgUser(dispatcher) | |
{ | |
} | |
void put_test_message(string const& s) | |
{ | |
dispatcher->put(shared_ptr<IMessage>((IMessage*)new TestMsg(s))); | |
} | |
}; | |
class Recver : public MsgUser | |
{ | |
public: | |
Recver(MsgDispatcher* dispatcher) | |
: MsgUser(dispatcher) | |
{ | |
msg_handlers.insert(make_pair(TestMsg::id, bind(&Recver::recv_test_message, this, placeholders::_1))); | |
} | |
void recv_test_message(IMessage* msg) | |
{ | |
((TestMsg*)msg)->print_message(); | |
} | |
}; | |
int main() | |
{ | |
MsgDispatcher disp; | |
auto t1 = thread([&]() { | |
for ( int i = 0; i < 100; i++) | |
{ | |
Recver recver(&disp); | |
Sender sender(&disp); | |
sender.put_test_message("[tr1] hello world? are you hear me?"); | |
this_thread::sleep_for(chrono::milliseconds(8)); | |
sender.put_test_message("[tr1] hello ?????"); | |
this_thread::sleep_for(chrono::milliseconds(5)); | |
}}); | |
auto t2 = thread([&]() { | |
for (int i = 0; i < 100; i++) | |
{ | |
Recver recver(&disp); | |
Sender sender(&disp); | |
sender.put_test_message("[tr2] hello world? are you hear me?"); | |
this_thread::sleep_for(chrono::milliseconds(10)); | |
sender.put_test_message("[tr2] hello ?????"); | |
this_thread::sleep_for(chrono::milliseconds(3)); | |
}}); | |
auto t3 = thread([&]() { | |
for (int i = 0; i < 100; i++) | |
{ | |
Recver recver(&disp); | |
Sender sender(&disp); | |
sender.put_test_message("[tr3] hello world? are you hear me?"); | |
this_thread::sleep_for(chrono::milliseconds(2)); | |
sender.put_test_message("[tr3] hello ?????"); | |
this_thread::sleep_for(chrono::milliseconds(3)); | |
}}); | |
t1.join(); | |
t2.join(); | |
t3.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment