Skip to content

Instantly share code, notes, and snippets.

@sysroad
Last active June 9, 2021 07:02
Show Gist options
  • Save sysroad/d1f004e82a15b0ae2cd9b2e75d0ecc06 to your computer and use it in GitHub Desktop.
Save sysroad/d1f004e82a15b0ae2cd9b2e75d0ecc06 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <thread>
#include <vector>
#include <map>
#include <functional>
#include <concurrent_queue.h>
#include <concurrent_unordered_map.h>
#include <shared_mutex>
#include <condition_variable>
#include <atomic>
#include <future>
using namespace std;
// ID 확인을 위한 인터페이스
class IMessage
{
public:
virtual int ID() const { return -1; }
};
// T 타입 데이터를 참조 포인터로 갖는 메시지
template<typename T> class Message : IMessage
{
protected:
shared_ptr<T> data;
Message(shared_ptr<T> data)
{
this->data.reset(data);
}
Message(T data)
{
this->data.reset(new T(data));
}
};
class MsgDispatcher;
// 메시지를 보내고 받는 객체 베이스
class MsgUser
{
protected:
MsgDispatcher* dispatcher;
int user_id;
typedef function<void(IMessage*)> MsgHandler;
map<int, MsgHandler> msg_handlers;
public:
MsgUser(MsgDispatcher* dispatcher);
~MsgUser();
void handle_message(IMessage* msg);
};
// 메시지 분배기
class MsgDispatcher
{
typedef Concurrency::concurrent_queue<shared_ptr<IMessage>> MessageQue;
typedef Concurrency::concurrent_unordered_map<int, MsgUser*> UserMap;
typedef shared_mutex Lock;
typedef unique_lock<Lock> WriterLock;
typedef shared_lock<Lock> ReaderLock;
Lock user_que_lock;
mutex msg_mutex;
condition_variable msg_event;
MessageQue msgQ;
UserMap user_map;
atomic<int> id = 0;
public:
MsgDispatcher();
~MsgDispatcher();
int subscribe(MsgUser* user);
void unsubscribe(int id);
void put(shared_ptr<IMessage> m);
private:
void MsgProc();
};
class TestMsg : public Message<string>
{
public:
TestMsg(string s)
: Message(s)
{
}
static const int id = 1;
virtual int ID() const { return id; }
void print_message()
{
cout << data.get()->c_str() << endl;
}
};
MsgDispatcher::MsgDispatcher()
{
thread([this]() { MsgProc(); }).detach();
}
MsgDispatcher::~MsgDispatcher()
{
/*if (user_map.empty())
{
cout << "all clear" << endl;
}*/
}
int MsgDispatcher::subscribe(MsgUser* user)
{
ReaderLock lock(user_que_lock);
int new_id = ++id;
user_map.insert(make_pair(new_id, user));
return new_id;
}
void MsgDispatcher::unsubscribe(int id)
{
auto find = user_map.find(id);
if (find != user_map.end())
{
WriterLock lock(user_que_lock);
user_map.unsafe_erase(id);
}
}
void MsgDispatcher::put(shared_ptr<IMessage> m)
{
msgQ.push(move(m));
msg_event.notify_one();
}
void MsgDispatcher::MsgProc()
{
while (true)
{
unique_lock<mutex> lock(msg_mutex);
msg_event.wait(lock, [this]() -> bool { return !msgQ.empty(); });
shared_ptr<IMessage> m;
if (msgQ.try_pop(m))
{
ReaderLock lock(user_que_lock);
for (auto iter = user_map.begin(); iter != user_map.end(); ++iter)
{
((*iter).second)->handle_message(m.get());
}
}
}
}
MsgUser::MsgUser(MsgDispatcher* dispatcher)
: dispatcher(dispatcher)
{
user_id = dispatcher->subscribe(this);
}
MsgUser::~MsgUser()
{
if (0 <= user_id)
dispatcher->unsubscribe(user_id);
}
void MsgUser::handle_message(IMessage* msg)
{
auto handle = msg_handlers.find(msg->ID());
if (handle != msg_handlers.end())
{
try
{
(*handle).second(msg);
}
catch (const std::exception&)
{
}
}
}
class Sender : public MsgUser
{
public:
Sender(MsgDispatcher* dispatcher)
: MsgUser(dispatcher)
{
}
void put_test_message(string const& s)
{
dispatcher->put(shared_ptr<IMessage>((IMessage*)new TestMsg(s)));
}
};
class Recver : public MsgUser
{
public:
Recver(MsgDispatcher* dispatcher)
: MsgUser(dispatcher)
{
msg_handlers.insert(make_pair(TestMsg::id, bind(&Recver::recv_test_message, this, placeholders::_1)));
}
void recv_test_message(IMessage* msg)
{
((TestMsg*)msg)->print_message();
}
};
int main()
{
MsgDispatcher disp;
auto t1 = thread([&]() {
for ( int i = 0; i < 100; i++)
{
Recver recver(&disp);
Sender sender(&disp);
sender.put_test_message("[tr1] hello world? are you hear me?");
this_thread::sleep_for(chrono::milliseconds(8));
sender.put_test_message("[tr1] hello ?????");
this_thread::sleep_for(chrono::milliseconds(5));
}});
auto t2 = thread([&]() {
for (int i = 0; i < 100; i++)
{
Recver recver(&disp);
Sender sender(&disp);
sender.put_test_message("[tr2] hello world? are you hear me?");
this_thread::sleep_for(chrono::milliseconds(10));
sender.put_test_message("[tr2] hello ?????");
this_thread::sleep_for(chrono::milliseconds(3));
}});
auto t3 = thread([&]() {
for (int i = 0; i < 100; i++)
{
Recver recver(&disp);
Sender sender(&disp);
sender.put_test_message("[tr3] hello world? are you hear me?");
this_thread::sleep_for(chrono::milliseconds(2));
sender.put_test_message("[tr3] hello ?????");
this_thread::sleep_for(chrono::milliseconds(3));
}});
t1.join();
t2.join();
t3.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment