Skip to content

Instantly share code, notes, and snippets.

@kala13x
Last active November 26, 2021 21:34
Show Gist options
  • Save kala13x/622a6901a79abe3e9cbb3c4c012074e7 to your computer and use it in GitHub Desktop.
Save kala13x/622a6901a79abe3e9cbb3c4c012074e7 to your computer and use it in GitHub Desktop.
Simple chat server with subscribe pattern
/*
* upwork/singles/chat_server.cpp
*
* 2021 (c) Sun Dro ([email protected])
*
* EPOLL based high performance chat server with subscribe pattern
* Compile: g++ -g -O2 -Wall chat_server.cpp -o chat_server -lpthread
*/
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <string>
#include <vector>
#include <deque>
#define LOG_FILE_PATH "chat_server_messages.log" // Example output
#define LOCAL_NET_ADDR 16777343 // 127.0.0.1 in network byte order
#define MESSAGE_MAX 8192
#define EVENTS_MAX 120000
#define XEVENT_ERROR 1
#define XEVENT_USER 2
#define XEVENT_READ 3
#define XEVENT_WRITE 4
#define XEVENT_HUNGED 5
#define XEVENT_CLOSED 6
#define XEVENT_CLEAR 7
#define XEVENT_DESTROY 8
#define XEVENT_EXCEPTION 9
typedef struct EventData_ {
void *ptr; // Data pointer
int events; // Ready events
int type; // Event type
int fd; // Socket descriptor
} EventData;
typedef int(*EventCallback)(void *events, void* data, int reason);
class Events
{
public:
~Events();
bool Create(size_t nMax, void *pUser, EventCallback callBack);
EventData* Register(void *pCtx, int nFd, int nEvents, int nType);
bool Add(EventData* pData, int nEvents);
bool Modify(EventData *pData, int nEvents);
bool Delete(EventData *pData);
bool Service(int nTimeoutMs);
void *UserSpace() { return _pUserSpace; }
void ServiceCallback(EventData *pData);
void ClearCallback(EventData *pEvData);
private:
EventCallback _eventCallback = NULL; /* Service callback */
struct epoll_event* _pEventArray = NULL; /* EPOLL event array */
void* _pUserSpace = NULL; /* User space pointer */
uint32_t _nEventMax = 0; /* Max allowed file descriptors */
int _nEventFd = -1; /* EPOLL File decriptor */
};
Events::~Events()
{
if (_nEventFd >= 0)
{
close(_nEventFd);
_nEventFd = -1;
}
if (_pEventArray)
{
free(_pEventArray);
_pEventArray = NULL;
}
_eventCallback(this, NULL, XEVENT_DESTROY);
}
bool Events::Create(size_t nMax, void *pUser, EventCallback callBack)
{
_nEventMax = nMax > 0 ? nMax : EVENTS_MAX;
_eventCallback = callBack;
_pUserSpace = pUser;
/* Allocate memory for event array */
_pEventArray = (struct epoll_event*)calloc(_nEventMax, sizeof(struct epoll_event));
if (_pEventArray == NULL)
{
fprintf(stderr, "Can not allocate memory for event array: %s\n", strerror(errno));
return false;
}
/* Create event epoll instance */
_nEventFd = epoll_create1(0);
if (_nEventFd < 0)
{
fprintf(stderr, "Can not crerate epoll: %s\n", strerror(errno));
free(_pEventArray);
_pEventArray = NULL;
return false;
}
return true;
}
void Events::ServiceCallback(EventData *pData)
{
/* Check error condition */
if (pData->events & EPOLLRDHUP) { _eventCallback(this, pData, XEVENT_CLOSED); return; }
if (pData->events & EPOLLHUP) { _eventCallback(this, pData, XEVENT_HUNGED); return; }
if (pData->events & EPOLLERR) { _eventCallback(this, pData, XEVENT_ERROR); return; }
if (pData->events & EPOLLPRI) { _eventCallback(this, pData, XEVENT_EXCEPTION); return; }
/* Callback on writeable */
if (pData->events & EPOLLOUT && _eventCallback(this, pData, XEVENT_WRITE) < 0)
{ _eventCallback(this, pData, XEVENT_USER); return; } // User requested callback
/* Callback on readable */
if (pData->events & EPOLLIN && _eventCallback(this, pData, XEVENT_READ) < 0)
{ _eventCallback(this, pData, XEVENT_USER); return; } // User requested callback
}
void Events::ClearCallback(EventData *pEvData)
{
if (pEvData != NULL)
{
_eventCallback(this, pEvData, XEVENT_CLEAR);
free(pEvData);
}
}
EventData* Events::Register(void *pCtx, int nFd, int nEvents, int nType)
{
/* Allocate memory for event data */
EventData* pData = (EventData*)malloc(sizeof(EventData));
if (pData == NULL)
{
fprintf(stderr, "Can not allocate memory for event: %s\n", strerror(errno));
return NULL;
}
/* Initialize event */
pData->events = 0;
pData->type = nType;
pData->ptr = pCtx;
pData->fd = nFd;
/* Add event to the instance */
if (!Add(pData, nEvents))
{
free(pData);
return NULL;
}
return pData;
}
bool Events::Add(EventData* pData, int nEvents)
{
struct epoll_event event;
event.data.ptr = pData;
event.events = nEvents;
if (epoll_ctl(_nEventFd, EPOLL_CTL_ADD, pData->fd, &event) < 0)
{
fprintf(stderr, "epoll_ctl() failed: %s\n", strerror(errno));
return false;
}
return true;
}
bool Events::Modify(EventData *pData, int nEvents)
{
struct epoll_event event;
event.data.ptr = pData;
event.events = nEvents;
if (epoll_ctl(_nEventFd, EPOLL_CTL_MOD, pData->fd, &event) < 0)
{
fprintf(stderr, "epoll_ctl() failed: %s\n", strerror(errno));
return false;
}
return true;
}
bool Events::Delete(EventData *pData)
{
int nEFD = pData->fd;
ClearCallback(pData);
if (epoll_ctl(_nEventFd, EPOLL_CTL_DEL, nEFD, NULL) < 0) return false;
return true;
}
bool Events::Service(int nTimeoutMs)
{
int nCount; /* Wait for ready events */
do nCount = epoll_wait(_nEventFd, _pEventArray, _nEventMax, nTimeoutMs);
while (errno == EINTR);
for (int i = 0; i < nCount; i++)
{
/* Call callback for each ready event */
EventData *pData = (EventData*)_pEventArray[i].data.ptr;
pData->events = _pEventArray[i].events;
ServiceCallback(pData);
}
return (nCount < 0) ? false : true;
}
typedef std::deque<std::string> TxBuffer;
class Client
{
public:
typedef std::vector<Client*> Clients;
~Client()
{
if (_nSock > 0)
{
close(_nSock);
_nSock = -1;
}
}
void SetClients(Clients *pClients) { _pClients = pClients; }
void SetEventData(EventData *pData) { _pEvData = pData; }
void SetEventHandler(Events *pEvents) { _pEvents = pEvents; }
void Unsubscribe();
void SendToAll(const char *pMessage);
std::string GetMessage();
void AppendBuffer(const char *pMessage) { _rxBuffer.append(pMessage); }
void AdvanceBuffer(size_t nSize) { _rxBuffer.erase(0, nSize); }
const std::string& GetBuffer(size_t nSize) { return _rxBuffer; }
void SetIPAddr(const struct in_addr inAddr);
const char* GetIPAddr() { return _sAddress.c_str(); };
void SetFD(int nFD) { _nSock = nFD; };
int GetFD() { return _nSock; };
private:
EventData* _pEvData;
Events* _pEvents;
Clients* _pClients;
std::string _sAddress;
std::string _rxBuffer;
TxBuffer _txBuffer;
int _nSock = -1;
};
void Client::SetIPAddr(const struct in_addr inAddr)
{
char sAddres[64];
/* Get string from network byte order */
int nLength = snprintf(sAddres,
sizeof(sAddres), "%d.%d.%d.%d",
(int)((inAddr.s_addr & 0x000000FF)),
(int)((inAddr.s_addr & 0x0000FF00)>>8),
(int)((inAddr.s_addr & 0x00FF0000)>>16),
(int)((inAddr.s_addr & 0xFF000000)>>24));
_sAddress = std::string(sAddres, nLength);
}
std::string Client::GetMessage()
{
if (_txBuffer.size())
{
std::string sMessage = _txBuffer.front();
_txBuffer.pop_front();
return sMessage;
}
return std::string("");
}
void Client::SendToAll(const char *pMessage)
{
for (size_t i = 0; i < _pClients->size(); i++)
{
Client *pClient = _pClients->at(i);
std::string sMessage;
/* Dont send the message to the author */
if (pClient->GetFD() == _nSock) continue;
/* Create final meesage (insert message athor at the begining) */
sMessage = std::string("(")
+ _sAddress
+ std::string(")[")
+ std::to_string(_nSock)
+ std::string("]: ")
+ std::string(pMessage)
+ std::string("\n");
/* Add message to the tx buffer */
pClient->_txBuffer.push_back(std::string(sMessage));
/* Mark client as writeable */
int nEvents = EPOLLRDHUP | EPOLLIN | EPOLLOUT;
pClient->_pEvents->Modify(pClient->_pEvData, nEvents);
}
}
void Client::Unsubscribe()
{
for (size_t i = 0; i < _pClients->size(); i++)
{
Client *pClient = _pClients->at(i);
if (pClient->GetFD() == _nSock)
{
/* Remove client from message bus */
_pClients->erase(_pClients->begin() + i);
return;
}
}
}
int CreateListenerSocket(uint16_t nPort)
{
struct sockaddr_in inAddr;
inAddr.sin_family = AF_INET;
inAddr.sin_port = htons(nPort);
inAddr.sin_addr.s_addr = htonl(INADDR_ANY);;
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0)
{
fprintf(stderr, "Failed to create listener socket: (%s)\n", strerror(errno));
return -1;
}
/* Bind the socket on port */
if (bind(fd, (struct sockaddr*)&inAddr, sizeof(inAddr)) < 0)
{
fprintf(stderr, "Failed to bind socket on port: %u (%s)\n", nPort, strerror(errno));
close(fd);
return -1;
}
/* Listen to the socket */
if (listen(fd, 120000) < 0)
{
fprintf(stderr, "Failed to listen port: %u (%s)\n", nPort, strerror(errno));
close(fd);
return -1;
}
return fd;
}
int CreateClientSocket(uint16_t nPort)
{
struct sockaddr_in sockAddr;
sockAddr.sin_family = AF_INET;
sockAddr.sin_port = htons(nPort);
sockAddr.sin_addr.s_addr = LOCAL_NET_ADDR;
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0)
{
fprintf(stderr, "Can not create client socket: %u (%s)\n", nPort, strerror(errno));
return -1;
}
if (connect(fd, (struct sockaddr*)&sockAddr, sizeof(sockAddr)) < 0)
{
fprintf(stderr, "Can not connect to the socket: %u (%s)\n", nPort, strerror(errno));
close(fd);
return -1;
}
return fd;
}
bool NonBlock(int nSock)
{
/* Get active flags on the socket */
int fl = fcntl(nSock, F_GETFL);
if (fl < 0)
{
fprintf(stderr, "Failed fcntl(): %s\n", strerror(errno));
return false;
}
/* Set non-block flag */
fl = fcntl(nSock, F_SETFL, fl | O_NONBLOCK);
if (fl < 0)
{
fprintf(stderr, "Failed to non-block socket: %s\n", strerror(errno));
return false;
}
return true;
}
int ReadEvent(Events *pEvents, EventData *pEvData)
{
/* Get server socket descriptor */
int nServerFd = (*(int*)pEvents->UserSpace());
if (nServerFd == pEvData->fd)
{
Client::Clients *pClients = (Client::Clients*)pEvData->ptr;
socklen_t len = sizeof(struct sockaddr);
struct sockaddr_in inAddr;
/* Acceot to the new connection request */
int nClientFD = accept(nServerFd, (struct sockaddr*)&inAddr, &len);
if (nClientFD < 0)
{
fprintf(stderr, "Can not accept to the socket: %s\n", strerror(errno));
return 0;
}
/* Make non-block connection */
if (!NonBlock(nClientFD))
{
close(nClientFD);
return 0;
}
/* Initialize new client */
Client *pClient = new Client;
pClient->SetClients(pClients);
pClient->SetIPAddr(inAddr.sin_addr);
pClient->SetFD(nClientFD);
/* Register client into the event instance */
EventData *pEvData = pEvents->Register(pClient, nClientFD, EPOLLIN, 0);
if (pEvData == NULL)
{
delete pClient;
return 0;
}
/* Give client access to the event engine */
pClient->SetEventHandler(pEvents);
pClient->SetEventData(pEvData);
printf("Connected client: %s (%d)\n", pClient->GetIPAddr(), nClientFD);
pClients->push_back(pClient); /* Subscribe client to the message bus */
}
else
{
Client *pClient = (Client*)pEvData->ptr;
int nClientFD = pEvData->fd;
char sRxBuffer[MESSAGE_MAX];
/* Read incomming message from the client */
int nBytes = read(nClientFD, sRxBuffer, sizeof(sRxBuffer));
if (nBytes <= 0)
{
if (!nBytes) printf("Disconnected client: %s[%d]\n", pClient->GetIPAddr(), nClientFD);
else fprintf(stderr, "Can not read data from client: %s (%s)\n", pClient->GetIPAddr(), strerror(errno));
/* Delete client from the event instance */
pClient->Unsubscribe();
pEvents->Delete(pEvData);
return -1;
}
/* Null terminate receive buffer */
sRxBuffer[nBytes] = '\0';
/* Search new line in the message */
char *pOffset = strstr(sRxBuffer, "\n");
if (pOffset == NULL) /* Message is not complete */
{
/* Append message part to the RX buffer */
pClient->AppendBuffer(sRxBuffer);
return 0;
}
size_t nLength = pOffset - sRxBuffer;
char sMessage[nLength];
strncpy(sMessage, sRxBuffer, nLength);
sMessage[nLength] = '\0';
printf("Received message from: client(%s)[%d]: %s\n",
pClient->GetIPAddr(), nClientFD, sMessage);
/* Send received message to all other clients */
pClient->SendToAll(sMessage);
pClient->AdvanceBuffer(nLength);
}
return 0;
}
int WriteEvent(Events *pEvents, EventData *pEvData)
{
Client *pClient = (Client*)pEvData->ptr;
int nEvents = EPOLLRDHUP | EPOLLIN;
int nClientFD = pEvData->fd;
/* Get ready message from bus and send to the client */
std::string sMessage = pClient->GetMessage();
if (sMessage.length())
{
send(nClientFD, sMessage.c_str(), sMessage.length(), 0);
nEvents |= EPOLLOUT; /* Set enable client for writing */
}
pEvents->Modify(pEvData, nEvents);
return 0;
}
void ClearEvent(EventData *pEvData)
{
if (pEvData != NULL)
{
if (pEvData->ptr != NULL)
{
free(pEvData->ptr);
pEvData->ptr = NULL;
}
if (pEvData->fd >= 0)
{
shutdown(pEvData->fd, SHUT_RDWR);
close(pEvData->fd);
pEvData->fd = -1;
}
}
}
int XEvent_Callback(void *events, void* data, int reason)
{
EventData *pData = (EventData*)data;
Events *pEvents = (Events*)events;
int nServerFd = (*(int*)pEvents->UserSpace());
switch(reason)
{
case XEVENT_READ:
return ReadEvent(pEvents, pData);
case XEVENT_WRITE:
return WriteEvent(pEvents, pData);
case XEVENT_CLEAR:
ClearEvent(pData);
break;
case XEVENT_HUNGED:
printf("Connection hunged: fd(%d)\n", pData->fd);
pEvents->Delete(pData);
break;
case XEVENT_CLOSED:
printf("Connection closed: fd(%d)\n", pData->fd);
pEvents->Delete(pData);
break;
case XEVENT_DESTROY:
printf("Service destroyed\n");
close(nServerFd);
break;
default:
break;
}
return 0;
}
void* LoggerThread(void *pContext)
{
/* Get port for logger connection */
int nPort = (*(int*)pContext);
/* Connect to the chat server */
int nSock = CreateClientSocket(nPort);
if (nSock < 0)
{
fprintf(stderr, "Can not connect to the message bus: %s", strerror(errno));
return NULL;
}
while (true)
{
char sBuffer[MESSAGE_MAX]; /* Read message from bus */
int nBytes = read(nSock, sBuffer, sizeof(sBuffer));
if (nBytes < 0)
{
fprintf(stderr, "Can not read from message bus: %s", strerror(errno));
close(nSock);
return NULL;
}
sBuffer[nBytes] = 0; // NULL terminate receive buffer
/* Dont try write empty message in the file */
if (!strlen(sBuffer)) continue;
/* Open log file for appending */
FILE *fp = fopen(LOG_FILE_PATH, "a");
if (fp == NULL)
{
fprintf(stderr, "Can not open file: %s (%s)", LOG_FILE_PATH, strerror(errno));
continue;
}
/* Log received message to file */
fprintf(fp, "%s", sBuffer);
fclose(fp);
}
close(nSock);
return NULL;
}
int main(int argc, char* argv[])
{
if (argc < 2)
{
printf("Usage: %s <port>\n", argv[0]);
printf("Example: %s 8181\n", argv[0]);
return 1;
}
int nPort = atoi(argv[1]);
bool bStatus = true;
int nServerFd = CreateListenerSocket(nPort);
if (nServerFd < 0) return 1;
printf("Server started listen to port: %d\n", nPort);
Client::Clients allClients;
Events events;
/* Create event instance and add listener socket to the instance */
if (!events.Create(0, &nServerFd, XEvent_Callback) ||
!events.Register(&allClients, nServerFd, EPOLLIN, 0)) return 1;
pthread_t thread;
pthread_attr_t pattr;
/* Run logger thread and subscribe to the message bus */
if (pthread_attr_init(&pattr) ||
pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED) ||
pthread_create(&thread, &pattr, LoggerThread, &nPort) ||
pthread_attr_destroy(&pattr))
{
fprintf(stderr, "Thread start failed with code: %s", strerror(errno));
exit(EXIT_FAILURE);
}
while (bStatus) /* Main service loop */
bStatus = events.Service(100);
close(nServerFd);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment