Skip to content

Instantly share code, notes, and snippets.

@mcchae
Created May 7, 2015 15:59
Show Gist options
  • Save mcchae/cacfa9c1a404d7b382f1 to your computer and use it in GitHub Desktop.
Save mcchae/cacfa9c1a404d7b382f1 to your computer and use it in GitHub Desktop.
Reliable UDP library - UDT sample code
#include <cstdlib>
#include <cstring>
#include <netdb.h>
#include <iostream>
#include <udt.h>
#include <stdio.h>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <unistd.h>
//========================================================================================
#define UDT_MAX_MSG (1024*1024)
enum TLogLevel {
logERROR, logWARNING, logINFO,
logDEBUG, logDEBUG1, logDEBUG2,
logDEBUG3, logDEBUG4
};
TLogLevel g_loglevel=logINFO;
//========================================================================================
inline void _debug_printf(const char * fmt, ...)
{
FILE * afp = NULL;
char dfname[512];
sprintf(dfname, "uc_%08d.txt", getpid());
afp = fopen(dfname, "a");
char t[10240];
va_list ap;
va_start(ap, fmt);
vsnprintf(t, sizeof(t), fmt, ap);
va_end(ap);
fprintf(afp, "%s", t);
fclose(afp);
}
//========================================================================================
inline char * _logout(int loglevel, char *t, size_t t_len, const char * fmt, ...)
{
if ((int)loglevel > (int)g_loglevel)
return NULL;
va_list ap;
va_start(ap, fmt);
vsnprintf(t, t_len, fmt, ap);
va_end(ap);
fprintf(stderr,"%s\n", t);
// syslog(LOG_INFO, "%s", t);
return t;
}
//========================================================================================
inline int _sendmsg(UDTSOCKET client, const char *msg, int ttl = -1, bool inorder = false)
{
int len = strlen(msg); // '\0' terminated
int ss = UDT::sendmsg(client, msg, len+1);
if (UDT::ERROR == ss)
{
char errmsg[1024];
_logout(logERROR, errmsg,sizeof(errmsg),
"sendmsg:%s",UDT::getlasterror().getErrorMessage());
return 0;
}
_debug_printf("msg:msg=<%s>\n", msg);
return ss;
}
//========================================================================================
inline int _recvmsg(UDTSOCKET recver, char *msg, int msg_len)
{
int len = UDT::recvmsg(recver, msg, msg_len);
if (UDT::ERROR == len)
{
char errmsg[1024];
_logout(logERROR, errmsg,sizeof(errmsg),
"recvmsg:%s",UDT::getlasterror().getErrorMessage());
return 0;
}
_debug_printf("msg:msg=<%s>\n", msg);
len = strlen(msg); // '\0' terminated
return len;
}
//========================================================================================
inline char* _nowstr(char *str)
{
time_t timer = time(NULL); // 현재 시각을 초 단위로 얻기
struct tm *t = localtime(&timer); // 초 단위의 시간을 분리하여 구조체에 넣기
sprintf(str, "%04d%02d%02d%02d%02d%02d", t->tm_year + 1900, t->tm_mon + 1, t->tm_mday,
t->tm_hour, t->tm_min, t->tm_sec);
return str;
}
/****************************************************************************************/
using namespace std;
void* monitor(void*);
/****************************************************************************************/
int main(int argc, char* argv[])
{
char errmsg[1024];
if ((3 != argc) || (0 == atoi(argv[2])))
{
_logout(logERROR, errmsg,sizeof(errmsg), "usage: appclient server_ip server_port");
return 0;
}
struct addrinfo hints, *local, *peer;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = AF_INET;
// hints.ai_socktype = SOCK_STREAM;
hints.ai_socktype = SOCK_DGRAM;
if (0 != getaddrinfo(NULL, "12345", &hints, &local))
{
_logout(logERROR, errmsg, sizeof(errmsg), "incorrect network address");
return 0;
}
UDTSOCKET client = UDT::socket(local->ai_family, local->ai_socktype, local->ai_protocol);
freeaddrinfo(local);
if (0 != getaddrinfo(argv[1], argv[2], &hints, &peer))
{
_logout(logERROR, errmsg, sizeof(errmsg), "incorrect server/peer address. %s:%s",argv[1],argv[2]);
return 0;
}
// connect to the server, implict bind
if (UDT::ERROR == UDT::connect(client, peer->ai_addr, peer->ai_addrlen))
{
_logout(logERROR, errmsg, sizeof(errmsg), "connect: %s",UDT::getlasterror().getErrorMessage());
return 0;
}
freeaddrinfo(peer);
// pthread_create(new pthread_t, NULL, monitor, &client);
char nowstr[32];
char msg[UDT_MAX_MSG];
sprintf(msg, "{'op':'Ping','_now':'%s'}", _nowstr(nowstr));
_sendmsg(client, msg);
_recvmsg(client, msg, sizeof(msg));
for (int i = 0; i < 10000; ++i) {
sprintf(msg, "{'op':'Req','value':%d,'_now':'%s'}", i, _nowstr(nowstr));
_sendmsg(client, msg);
_recvmsg(client, msg, sizeof(msg));
}
for (int i = 0; i < 10000; ++i) {
sprintf(msg, "{'op':'Info','value':%d,'_now':'%s'}", i, _nowstr(nowstr));
_sendmsg(client, msg);
}
sprintf(msg, "{'op':'Alert','value':'Urgent system error','_now':'%s'}", _nowstr(nowstr));
_sendmsg(client, msg);
sprintf(msg, "{'op':'Quit','_now':'%s'}", _nowstr(nowstr));
_sendmsg(client, msg);
UDT::close(client);
return 0;
}
/****************************************************************************************/
void* monitor(void* s)
{
UDTSOCKET u = *(UDTSOCKET*)s;
UDT::TRACEINFO perf;
cout << "SendRate(Mb/s)\tRTT(ms)\tCWnd\tPktSndPeriod(us)\tRecvACK\tRecvNAK" << endl;
while (true)
{
sleep(1);
if (UDT::ERROR == UDT::perfmon(u, &perf))
{
char errmsg[1024];
_logout(logERROR, errmsg, sizeof(errmsg),
"perfmon: %s",UDT::getlasterror().getErrorMessage());
break;
}
cout << perf.mbpsSendRate << "\t\t"
<< perf.msRTT << "\t"
<< perf.pktCongestionWindow << "\t"
<< perf.usPktSndPeriod << "\t\t\t"
<< perf.pktRecvACK << "\t"
<< perf.pktRecvNAK << endl;
}
return NULL;
}
#include <cstdlib>
#include <cstring>
#include <netdb.h>
#include <iostream>
#include <udt.h>
#include <stdio.h>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <unistd.h>
//========================================================================================
#define UDT_MAX_MSG (1024*1024)
enum TLogLevel {
logERROR, logWARNING, logINFO,
logDEBUG, logDEBUG1, logDEBUG2,
logDEBUG3, logDEBUG4
};
TLogLevel g_loglevel=logINFO;
//========================================================================================
inline void _debug_printf(const char * fmt, ...)
{
FILE * afp = NULL;
char dfname[512];
sprintf(dfname, "us_%08d.txt", getpid());
afp = fopen(dfname, "a");
char t[10240];
va_list ap;
va_start(ap, fmt);
vsnprintf(t, sizeof(t), fmt, ap);
va_end(ap);
fprintf(afp, "%s", t);
fclose(afp);
}
//========================================================================================
inline char * _logout(int loglevel, char *t, size_t t_len, const char * fmt, ...)
{
if ((int)loglevel > (int)g_loglevel)
return NULL;
va_list ap;
va_start(ap, fmt);
vsnprintf(t, t_len, fmt, ap);
va_end(ap);
fprintf(stderr,"%s\n", t);
// syslog(LOG_INFO, "%s", t);
return t;
}
//========================================================================================
inline int _sendmsg(UDTSOCKET client, const char *msg, int ttl = -1, bool inorder = false)
{
int len = strlen(msg); // '\0' terminated
int ss = UDT::sendmsg(client, msg, len+1);
if (UDT::ERROR == ss)
{
char errmsg[1024];
_logout(logERROR, errmsg,sizeof(errmsg),
"sendmsg:%s",UDT::getlasterror().getErrorMessage());
return 0;
}
_debug_printf("msg:msg=<%s>\n", msg);
return ss;
}
//========================================================================================
inline int _recvmsg(UDTSOCKET server, char *msg, int msg_len)
{
int len = UDT::recvmsg(server, msg, msg_len);
if (UDT::ERROR == len)
{
char errmsg[1024];
_logout(logERROR, errmsg,sizeof(errmsg),
"recvmsg:%s",UDT::getlasterror().getErrorMessage());
return 0;
}
_debug_printf("msg:msg=<%s>\n", msg);
len = strlen(msg); // '\0' terminated
return len;
}
//========================================================================================
inline char* _nowstr(char *str)
{
time_t timer = time(NULL); // 현재 시각을 초 단위로 얻기
struct tm *t = localtime(&timer); // 초 단위의 시간을 분리하여 구조체에 넣기
sprintf(str, "%04d%02d%02d%02d%02d%02d", t->tm_year + 1900, t->tm_mon + 1, t->tm_mday,
t->tm_hour, t->tm_min, t->tm_sec);
return str;
}
/****************************************************************************************/
using namespace std;
void* recvdata(void*);
/****************************************************************************************/
int main(int argc, char* argv[])
{
char errmsg[1024];
if ((1 != argc) && ((2 != argc) || (0 == atoi(argv[1]))))
{
_logout(logERROR, errmsg,sizeof(errmsg),"usage: appserver [server_port]");
return 0;
}
addrinfo hints;
addrinfo* res;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = AF_INET;
// hints.ai_socktype = SOCK_STREAM;
hints.ai_socktype = SOCK_DGRAM;
string service("12345");
if (2 == argc)
service = argv[1];
if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res))
{
_logout(logERROR, errmsg,sizeof(errmsg),"illegal port number or port is busy");
return 0;
}
UDTSOCKET serv = UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (UDT::ERROR == UDT::bind(serv, res->ai_addr, res->ai_addrlen))
{
_logout(logERROR, errmsg,sizeof(errmsg),"bind:%s",UDT::getlasterror().getErrorMessage());
return 0;
}
freeaddrinfo(res);
_logout(logINFO, errmsg,sizeof(errmsg),"server is ready at port:%s", service.c_str());
if (UDT::ERROR == UDT::listen(serv, 10))
{
_logout(logERROR, errmsg,sizeof(errmsg),"listen:%s",UDT::getlasterror().getErrorMessage());
return 0;
}
sockaddr_storage clientaddr;
int addrlen = sizeof(clientaddr);
UDTSOCKET server;
while (true)
{
if (UDT::INVALID_SOCK == (server = UDT::accept(serv, (sockaddr*)&clientaddr, &addrlen)))
{
cout << "accept: " << UDT::getlasterror().getErrorMessage() << endl;
return 0;
}
char clienthost[NI_MAXHOST];
char clientservice[NI_MAXSERV];
getnameinfo((sockaddr *)&clientaddr, addrlen, clienthost, sizeof(clienthost), clientservice, sizeof(clientservice), NI_NUMERICHOST|NI_NUMERICSERV);
cout << "new connection: " << clienthost << ":" << clientservice << endl;
pthread_t rcvthread;
pthread_create(&rcvthread, NULL, recvdata, new UDTSOCKET(server));
pthread_detach(rcvthread);
}
UDT::close(serv);
return 0;
}
/****************************************************************************************/
char* getOpStr(const char* msg, char *opstr, size_t opstr_len)
{
const char *p = msg; char *q = opstr;
size_t j;
for (; *p; ++p) {
if (strncmp(p, "'op':'",6)) continue;
p += 6;
for (j = 0; *p && j < opstr_len; ++j) {
if (*p == '\'') break;
*q++ = *p++;
}
*q = '\0';
}
return opstr;
}
/****************************************************************************************/
void* recvdata(void* usocket)
{
UDTSOCKET server = *(UDTSOCKET*)usocket;
delete (UDTSOCKET*)usocket;
char nowstr[32];
char msg[UDT_MAX_MSG], repmsg[UDT_MAX_MSG];
while (1) {
int rcnt = _recvmsg(server, msg, sizeof(msg));
if (rcnt <= 0) continue;
char opstr[64];
getOpStr(msg, opstr, sizeof(opstr));
if (!strcasecmp(opstr,"Ping")) {
// Pong
sprintf(repmsg, "{'op':'Pong','_now':'%s'}", _nowstr(nowstr));
_sendmsg(server, repmsg);
}
else if (!strcasecmp(opstr,"Req")) {
// reply
sprintf(repmsg, "{'op':'Rep','_now':'%s'}", _nowstr(nowstr));
_sendmsg(server, repmsg);
}
else if (!strcasecmp(opstr,"Info")) {
// info
}
else if (!strcasecmp(opstr,"Alert")) {
// Alert
}
else if (!strcasecmp(opstr,"Quit")) {
// Quit
break;
}
}
UDT::close(server);
return NULL;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment