Last active
November 26, 2021 21:34
-
-
Save kala13x/622a6901a79abe3e9cbb3c4c012074e7 to your computer and use it in GitHub Desktop.
Simple chat server with subscribe pattern
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* upwork/singles/chat_server.cpp | |
* | |
* 2021 (c) Sun Dro ([email protected]) | |
* | |
* EPOLL based high performance chat server with subscribe pattern | |
* Compile: g++ -g -O2 -Wall chat_server.cpp -o chat_server -lpthread | |
*/ | |
#include <stdio.h> | |
#include <errno.h> | |
#include <fcntl.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <pthread.h> | |
#include <sys/epoll.h> | |
#include <arpa/inet.h> | |
#include <string> | |
#include <vector> | |
#include <deque> | |
#define LOG_FILE_PATH "chat_server_messages.log" // Example output | |
#define LOCAL_NET_ADDR 16777343 // 127.0.0.1 in network byte order | |
#define MESSAGE_MAX 8192 | |
#define EVENTS_MAX 120000 | |
#define XEVENT_ERROR 1 | |
#define XEVENT_USER 2 | |
#define XEVENT_READ 3 | |
#define XEVENT_WRITE 4 | |
#define XEVENT_HUNGED 5 | |
#define XEVENT_CLOSED 6 | |
#define XEVENT_CLEAR 7 | |
#define XEVENT_DESTROY 8 | |
#define XEVENT_EXCEPTION 9 | |
typedef struct EventData_ { | |
void *ptr; // Data pointer | |
int events; // Ready events | |
int type; // Event type | |
int fd; // Socket descriptor | |
} EventData; | |
typedef int(*EventCallback)(void *events, void* data, int reason); | |
class Events | |
{ | |
public: | |
~Events(); | |
bool Create(size_t nMax, void *pUser, EventCallback callBack); | |
EventData* Register(void *pCtx, int nFd, int nEvents, int nType); | |
bool Add(EventData* pData, int nEvents); | |
bool Modify(EventData *pData, int nEvents); | |
bool Delete(EventData *pData); | |
bool Service(int nTimeoutMs); | |
void *UserSpace() { return _pUserSpace; } | |
void ServiceCallback(EventData *pData); | |
void ClearCallback(EventData *pEvData); | |
private: | |
EventCallback _eventCallback = NULL; /* Service callback */ | |
struct epoll_event* _pEventArray = NULL; /* EPOLL event array */ | |
void* _pUserSpace = NULL; /* User space pointer */ | |
uint32_t _nEventMax = 0; /* Max allowed file descriptors */ | |
int _nEventFd = -1; /* EPOLL File decriptor */ | |
}; | |
Events::~Events() | |
{ | |
if (_nEventFd >= 0) | |
{ | |
close(_nEventFd); | |
_nEventFd = -1; | |
} | |
if (_pEventArray) | |
{ | |
free(_pEventArray); | |
_pEventArray = NULL; | |
} | |
_eventCallback(this, NULL, XEVENT_DESTROY); | |
} | |
bool Events::Create(size_t nMax, void *pUser, EventCallback callBack) | |
{ | |
_nEventMax = nMax > 0 ? nMax : EVENTS_MAX; | |
_eventCallback = callBack; | |
_pUserSpace = pUser; | |
/* Allocate memory for event array */ | |
_pEventArray = (struct epoll_event*)calloc(_nEventMax, sizeof(struct epoll_event)); | |
if (_pEventArray == NULL) | |
{ | |
fprintf(stderr, "Can not allocate memory for event array: %s\n", strerror(errno)); | |
return false; | |
} | |
/* Create event epoll instance */ | |
_nEventFd = epoll_create1(0); | |
if (_nEventFd < 0) | |
{ | |
fprintf(stderr, "Can not crerate epoll: %s\n", strerror(errno)); | |
free(_pEventArray); | |
_pEventArray = NULL; | |
return false; | |
} | |
return true; | |
} | |
void Events::ServiceCallback(EventData *pData) | |
{ | |
/* Check error condition */ | |
if (pData->events & EPOLLRDHUP) { _eventCallback(this, pData, XEVENT_CLOSED); return; } | |
if (pData->events & EPOLLHUP) { _eventCallback(this, pData, XEVENT_HUNGED); return; } | |
if (pData->events & EPOLLERR) { _eventCallback(this, pData, XEVENT_ERROR); return; } | |
if (pData->events & EPOLLPRI) { _eventCallback(this, pData, XEVENT_EXCEPTION); return; } | |
/* Callback on writeable */ | |
if (pData->events & EPOLLOUT && _eventCallback(this, pData, XEVENT_WRITE) < 0) | |
{ _eventCallback(this, pData, XEVENT_USER); return; } // User requested callback | |
/* Callback on readable */ | |
if (pData->events & EPOLLIN && _eventCallback(this, pData, XEVENT_READ) < 0) | |
{ _eventCallback(this, pData, XEVENT_USER); return; } // User requested callback | |
} | |
void Events::ClearCallback(EventData *pEvData) | |
{ | |
if (pEvData != NULL) | |
{ | |
_eventCallback(this, pEvData, XEVENT_CLEAR); | |
free(pEvData); | |
} | |
} | |
EventData* Events::Register(void *pCtx, int nFd, int nEvents, int nType) | |
{ | |
/* Allocate memory for event data */ | |
EventData* pData = (EventData*)malloc(sizeof(EventData)); | |
if (pData == NULL) | |
{ | |
fprintf(stderr, "Can not allocate memory for event: %s\n", strerror(errno)); | |
return NULL; | |
} | |
/* Initialize event */ | |
pData->events = 0; | |
pData->type = nType; | |
pData->ptr = pCtx; | |
pData->fd = nFd; | |
/* Add event to the instance */ | |
if (!Add(pData, nEvents)) | |
{ | |
free(pData); | |
return NULL; | |
} | |
return pData; | |
} | |
bool Events::Add(EventData* pData, int nEvents) | |
{ | |
struct epoll_event event; | |
event.data.ptr = pData; | |
event.events = nEvents; | |
if (epoll_ctl(_nEventFd, EPOLL_CTL_ADD, pData->fd, &event) < 0) | |
{ | |
fprintf(stderr, "epoll_ctl() failed: %s\n", strerror(errno)); | |
return false; | |
} | |
return true; | |
} | |
bool Events::Modify(EventData *pData, int nEvents) | |
{ | |
struct epoll_event event; | |
event.data.ptr = pData; | |
event.events = nEvents; | |
if (epoll_ctl(_nEventFd, EPOLL_CTL_MOD, pData->fd, &event) < 0) | |
{ | |
fprintf(stderr, "epoll_ctl() failed: %s\n", strerror(errno)); | |
return false; | |
} | |
return true; | |
} | |
bool Events::Delete(EventData *pData) | |
{ | |
int nEFD = pData->fd; | |
ClearCallback(pData); | |
if (epoll_ctl(_nEventFd, EPOLL_CTL_DEL, nEFD, NULL) < 0) return false; | |
return true; | |
} | |
bool Events::Service(int nTimeoutMs) | |
{ | |
int nCount; /* Wait for ready events */ | |
do nCount = epoll_wait(_nEventFd, _pEventArray, _nEventMax, nTimeoutMs); | |
while (errno == EINTR); | |
for (int i = 0; i < nCount; i++) | |
{ | |
/* Call callback for each ready event */ | |
EventData *pData = (EventData*)_pEventArray[i].data.ptr; | |
pData->events = _pEventArray[i].events; | |
ServiceCallback(pData); | |
} | |
return (nCount < 0) ? false : true; | |
} | |
typedef std::deque<std::string> TxBuffer; | |
class Client | |
{ | |
public: | |
typedef std::vector<Client*> Clients; | |
~Client() | |
{ | |
if (_nSock > 0) | |
{ | |
close(_nSock); | |
_nSock = -1; | |
} | |
} | |
void SetClients(Clients *pClients) { _pClients = pClients; } | |
void SetEventData(EventData *pData) { _pEvData = pData; } | |
void SetEventHandler(Events *pEvents) { _pEvents = pEvents; } | |
void Unsubscribe(); | |
void SendToAll(const char *pMessage); | |
std::string GetMessage(); | |
void AppendBuffer(const char *pMessage) { _rxBuffer.append(pMessage); } | |
void AdvanceBuffer(size_t nSize) { _rxBuffer.erase(0, nSize); } | |
const std::string& GetBuffer(size_t nSize) { return _rxBuffer; } | |
void SetIPAddr(const struct in_addr inAddr); | |
const char* GetIPAddr() { return _sAddress.c_str(); }; | |
void SetFD(int nFD) { _nSock = nFD; }; | |
int GetFD() { return _nSock; }; | |
private: | |
EventData* _pEvData; | |
Events* _pEvents; | |
Clients* _pClients; | |
std::string _sAddress; | |
std::string _rxBuffer; | |
TxBuffer _txBuffer; | |
int _nSock = -1; | |
}; | |
void Client::SetIPAddr(const struct in_addr inAddr) | |
{ | |
char sAddres[64]; | |
/* Get string from network byte order */ | |
int nLength = snprintf(sAddres, | |
sizeof(sAddres), "%d.%d.%d.%d", | |
(int)((inAddr.s_addr & 0x000000FF)), | |
(int)((inAddr.s_addr & 0x0000FF00)>>8), | |
(int)((inAddr.s_addr & 0x00FF0000)>>16), | |
(int)((inAddr.s_addr & 0xFF000000)>>24)); | |
_sAddress = std::string(sAddres, nLength); | |
} | |
std::string Client::GetMessage() | |
{ | |
if (_txBuffer.size()) | |
{ | |
std::string sMessage = _txBuffer.front(); | |
_txBuffer.pop_front(); | |
return sMessage; | |
} | |
return std::string(""); | |
} | |
void Client::SendToAll(const char *pMessage) | |
{ | |
for (size_t i = 0; i < _pClients->size(); i++) | |
{ | |
Client *pClient = _pClients->at(i); | |
std::string sMessage; | |
/* Dont send the message to the author */ | |
if (pClient->GetFD() == _nSock) continue; | |
/* Create final meesage (insert message athor at the begining) */ | |
sMessage = std::string("(") | |
+ _sAddress | |
+ std::string(")[") | |
+ std::to_string(_nSock) | |
+ std::string("]: ") | |
+ std::string(pMessage) | |
+ std::string("\n"); | |
/* Add message to the tx buffer */ | |
pClient->_txBuffer.push_back(std::string(sMessage)); | |
/* Mark client as writeable */ | |
int nEvents = EPOLLRDHUP | EPOLLIN | EPOLLOUT; | |
pClient->_pEvents->Modify(pClient->_pEvData, nEvents); | |
} | |
} | |
void Client::Unsubscribe() | |
{ | |
for (size_t i = 0; i < _pClients->size(); i++) | |
{ | |
Client *pClient = _pClients->at(i); | |
if (pClient->GetFD() == _nSock) | |
{ | |
/* Remove client from message bus */ | |
_pClients->erase(_pClients->begin() + i); | |
return; | |
} | |
} | |
} | |
int CreateListenerSocket(uint16_t nPort) | |
{ | |
struct sockaddr_in inAddr; | |
inAddr.sin_family = AF_INET; | |
inAddr.sin_port = htons(nPort); | |
inAddr.sin_addr.s_addr = htonl(INADDR_ANY);; | |
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); | |
if (fd < 0) | |
{ | |
fprintf(stderr, "Failed to create listener socket: (%s)\n", strerror(errno)); | |
return -1; | |
} | |
/* Bind the socket on port */ | |
if (bind(fd, (struct sockaddr*)&inAddr, sizeof(inAddr)) < 0) | |
{ | |
fprintf(stderr, "Failed to bind socket on port: %u (%s)\n", nPort, strerror(errno)); | |
close(fd); | |
return -1; | |
} | |
/* Listen to the socket */ | |
if (listen(fd, 120000) < 0) | |
{ | |
fprintf(stderr, "Failed to listen port: %u (%s)\n", nPort, strerror(errno)); | |
close(fd); | |
return -1; | |
} | |
return fd; | |
} | |
int CreateClientSocket(uint16_t nPort) | |
{ | |
struct sockaddr_in sockAddr; | |
sockAddr.sin_family = AF_INET; | |
sockAddr.sin_port = htons(nPort); | |
sockAddr.sin_addr.s_addr = LOCAL_NET_ADDR; | |
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); | |
if (fd < 0) | |
{ | |
fprintf(stderr, "Can not create client socket: %u (%s)\n", nPort, strerror(errno)); | |
return -1; | |
} | |
if (connect(fd, (struct sockaddr*)&sockAddr, sizeof(sockAddr)) < 0) | |
{ | |
fprintf(stderr, "Can not connect to the socket: %u (%s)\n", nPort, strerror(errno)); | |
close(fd); | |
return -1; | |
} | |
return fd; | |
} | |
bool NonBlock(int nSock) | |
{ | |
/* Get active flags on the socket */ | |
int fl = fcntl(nSock, F_GETFL); | |
if (fl < 0) | |
{ | |
fprintf(stderr, "Failed fcntl(): %s\n", strerror(errno)); | |
return false; | |
} | |
/* Set non-block flag */ | |
fl = fcntl(nSock, F_SETFL, fl | O_NONBLOCK); | |
if (fl < 0) | |
{ | |
fprintf(stderr, "Failed to non-block socket: %s\n", strerror(errno)); | |
return false; | |
} | |
return true; | |
} | |
int ReadEvent(Events *pEvents, EventData *pEvData) | |
{ | |
/* Get server socket descriptor */ | |
int nServerFd = (*(int*)pEvents->UserSpace()); | |
if (nServerFd == pEvData->fd) | |
{ | |
Client::Clients *pClients = (Client::Clients*)pEvData->ptr; | |
socklen_t len = sizeof(struct sockaddr); | |
struct sockaddr_in inAddr; | |
/* Acceot to the new connection request */ | |
int nClientFD = accept(nServerFd, (struct sockaddr*)&inAddr, &len); | |
if (nClientFD < 0) | |
{ | |
fprintf(stderr, "Can not accept to the socket: %s\n", strerror(errno)); | |
return 0; | |
} | |
/* Make non-block connection */ | |
if (!NonBlock(nClientFD)) | |
{ | |
close(nClientFD); | |
return 0; | |
} | |
/* Initialize new client */ | |
Client *pClient = new Client; | |
pClient->SetClients(pClients); | |
pClient->SetIPAddr(inAddr.sin_addr); | |
pClient->SetFD(nClientFD); | |
/* Register client into the event instance */ | |
EventData *pEvData = pEvents->Register(pClient, nClientFD, EPOLLIN, 0); | |
if (pEvData == NULL) | |
{ | |
delete pClient; | |
return 0; | |
} | |
/* Give client access to the event engine */ | |
pClient->SetEventHandler(pEvents); | |
pClient->SetEventData(pEvData); | |
printf("Connected client: %s (%d)\n", pClient->GetIPAddr(), nClientFD); | |
pClients->push_back(pClient); /* Subscribe client to the message bus */ | |
} | |
else | |
{ | |
Client *pClient = (Client*)pEvData->ptr; | |
int nClientFD = pEvData->fd; | |
char sRxBuffer[MESSAGE_MAX]; | |
/* Read incomming message from the client */ | |
int nBytes = read(nClientFD, sRxBuffer, sizeof(sRxBuffer)); | |
if (nBytes <= 0) | |
{ | |
if (!nBytes) printf("Disconnected client: %s[%d]\n", pClient->GetIPAddr(), nClientFD); | |
else fprintf(stderr, "Can not read data from client: %s (%s)\n", pClient->GetIPAddr(), strerror(errno)); | |
/* Delete client from the event instance */ | |
pClient->Unsubscribe(); | |
pEvents->Delete(pEvData); | |
return -1; | |
} | |
/* Null terminate receive buffer */ | |
sRxBuffer[nBytes] = '\0'; | |
/* Search new line in the message */ | |
char *pOffset = strstr(sRxBuffer, "\n"); | |
if (pOffset == NULL) /* Message is not complete */ | |
{ | |
/* Append message part to the RX buffer */ | |
pClient->AppendBuffer(sRxBuffer); | |
return 0; | |
} | |
size_t nLength = pOffset - sRxBuffer; | |
char sMessage[nLength]; | |
strncpy(sMessage, sRxBuffer, nLength); | |
sMessage[nLength] = '\0'; | |
printf("Received message from: client(%s)[%d]: %s\n", | |
pClient->GetIPAddr(), nClientFD, sMessage); | |
/* Send received message to all other clients */ | |
pClient->SendToAll(sMessage); | |
pClient->AdvanceBuffer(nLength); | |
} | |
return 0; | |
} | |
int WriteEvent(Events *pEvents, EventData *pEvData) | |
{ | |
Client *pClient = (Client*)pEvData->ptr; | |
int nEvents = EPOLLRDHUP | EPOLLIN; | |
int nClientFD = pEvData->fd; | |
/* Get ready message from bus and send to the client */ | |
std::string sMessage = pClient->GetMessage(); | |
if (sMessage.length()) | |
{ | |
send(nClientFD, sMessage.c_str(), sMessage.length(), 0); | |
nEvents |= EPOLLOUT; /* Set enable client for writing */ | |
} | |
pEvents->Modify(pEvData, nEvents); | |
return 0; | |
} | |
void ClearEvent(EventData *pEvData) | |
{ | |
if (pEvData != NULL) | |
{ | |
if (pEvData->ptr != NULL) | |
{ | |
free(pEvData->ptr); | |
pEvData->ptr = NULL; | |
} | |
if (pEvData->fd >= 0) | |
{ | |
shutdown(pEvData->fd, SHUT_RDWR); | |
close(pEvData->fd); | |
pEvData->fd = -1; | |
} | |
} | |
} | |
int XEvent_Callback(void *events, void* data, int reason) | |
{ | |
EventData *pData = (EventData*)data; | |
Events *pEvents = (Events*)events; | |
int nServerFd = (*(int*)pEvents->UserSpace()); | |
switch(reason) | |
{ | |
case XEVENT_READ: | |
return ReadEvent(pEvents, pData); | |
case XEVENT_WRITE: | |
return WriteEvent(pEvents, pData); | |
case XEVENT_CLEAR: | |
ClearEvent(pData); | |
break; | |
case XEVENT_HUNGED: | |
printf("Connection hunged: fd(%d)\n", pData->fd); | |
pEvents->Delete(pData); | |
break; | |
case XEVENT_CLOSED: | |
printf("Connection closed: fd(%d)\n", pData->fd); | |
pEvents->Delete(pData); | |
break; | |
case XEVENT_DESTROY: | |
printf("Service destroyed\n"); | |
close(nServerFd); | |
break; | |
default: | |
break; | |
} | |
return 0; | |
} | |
void* LoggerThread(void *pContext) | |
{ | |
/* Get port for logger connection */ | |
int nPort = (*(int*)pContext); | |
/* Connect to the chat server */ | |
int nSock = CreateClientSocket(nPort); | |
if (nSock < 0) | |
{ | |
fprintf(stderr, "Can not connect to the message bus: %s", strerror(errno)); | |
return NULL; | |
} | |
while (true) | |
{ | |
char sBuffer[MESSAGE_MAX]; /* Read message from bus */ | |
int nBytes = read(nSock, sBuffer, sizeof(sBuffer)); | |
if (nBytes < 0) | |
{ | |
fprintf(stderr, "Can not read from message bus: %s", strerror(errno)); | |
close(nSock); | |
return NULL; | |
} | |
sBuffer[nBytes] = 0; // NULL terminate receive buffer | |
/* Dont try write empty message in the file */ | |
if (!strlen(sBuffer)) continue; | |
/* Open log file for appending */ | |
FILE *fp = fopen(LOG_FILE_PATH, "a"); | |
if (fp == NULL) | |
{ | |
fprintf(stderr, "Can not open file: %s (%s)", LOG_FILE_PATH, strerror(errno)); | |
continue; | |
} | |
/* Log received message to file */ | |
fprintf(fp, "%s", sBuffer); | |
fclose(fp); | |
} | |
close(nSock); | |
return NULL; | |
} | |
int main(int argc, char* argv[]) | |
{ | |
if (argc < 2) | |
{ | |
printf("Usage: %s <port>\n", argv[0]); | |
printf("Example: %s 8181\n", argv[0]); | |
return 1; | |
} | |
int nPort = atoi(argv[1]); | |
bool bStatus = true; | |
int nServerFd = CreateListenerSocket(nPort); | |
if (nServerFd < 0) return 1; | |
printf("Server started listen to port: %d\n", nPort); | |
Client::Clients allClients; | |
Events events; | |
/* Create event instance and add listener socket to the instance */ | |
if (!events.Create(0, &nServerFd, XEvent_Callback) || | |
!events.Register(&allClients, nServerFd, EPOLLIN, 0)) return 1; | |
pthread_t thread; | |
pthread_attr_t pattr; | |
/* Run logger thread and subscribe to the message bus */ | |
if (pthread_attr_init(&pattr) || | |
pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED) || | |
pthread_create(&thread, &pattr, LoggerThread, &nPort) || | |
pthread_attr_destroy(&pattr)) | |
{ | |
fprintf(stderr, "Thread start failed with code: %s", strerror(errno)); | |
exit(EXIT_FAILURE); | |
} | |
while (bStatus) /* Main service loop */ | |
bStatus = events.Service(100); | |
close(nServerFd); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment