Skip to content

Instantly share code, notes, and snippets.

@airekans
Last active January 1, 2016 10:29
Show Gist options
  • Save airekans/8131708 to your computer and use it in GitHub Desktop.
Save airekans/8131708 to your computer and use it in GitHub Desktop.
All kinds of echo server implementations.
/*
This exmple program provides a trivial echo server program that listens for TCP.
Where possible, it exits cleanly in response to a SIGINT (ctrl-c).
*/
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <ctime>
#ifndef WIN32
#include <netinet/in.h>
# ifdef _XOPEN_SOURCE_EXTENDED
# include <arpa/inet.h>
# endif
#include <sys/socket.h>
#endif
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
static const int PORT = 9995;
static void listener_cb(struct evconnlistener *, evutil_socket_t,
struct sockaddr *, int socklen, void *);
static void conn_writecb(struct bufferevent *, void *);
static void conn_eventcb(struct bufferevent *, short, void *);
static void signal_cb(evutil_socket_t, short, void *);
int
main(int argc, char **argv)
{
struct event_base *base;
struct evconnlistener *listener;
struct event *signal_event;
struct sockaddr_in sin;
#ifdef WIN32
WSADATA wsa_data;
WSAStartup(0x0201, &wsa_data);
#endif
base = event_base_new();
if (!base) {
fprintf(stderr, "Could not initialize libevent!\n");
return 1;
}
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);
listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
(struct sockaddr*)&sin,
sizeof(sin));
if (!listener) {
fprintf(stderr, "Could not create a listener!\n");
return 1;
}
signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
if (!signal_event || event_add(signal_event, NULL)<0) {
fprintf(stderr, "Could not create/add a signal event!\n");
return 1;
}
event_base_dispatch(base);
evconnlistener_free(listener);
event_free(signal_event);
event_base_free(base);
printf("done\n");
return 0;
}
static void echo(struct bufferevent *bev, const char* buf, const unsigned buf_len)
{
static unsigned req_count = 0;
if (++req_count > 100000)
{
printf("serve 100000 reqs: time: %u\n", (unsigned) time(0));
req_count = 0;
}
// simulate the processing time used to process the request.
int count = 0;
for (int i = 0; i < 1000; ++i)
{
++count;
}
bufferevent_write(bev, buf, buf_len);
bufferevent_enable(bev, EV_READ | EV_WRITE);
}
static void
conn_readcb(struct bufferevent *bev, void *user_data)
{
static const int BUF_SIZE = 4 * 1024;
static char buf[BUF_SIZE] = {0};
//static const char* const buf_end = buf + BUF_SIZE;
static unsigned pre_data_len = 0;
size_t n = 0;
char* cur = buf;
char* p = NULL;
while ((n = bufferevent_read(bev, buf + pre_data_len, BUF_SIZE - pre_data_len - 1)) > 0)
{
unsigned whole_data_len = n + pre_data_len;
buf[whole_data_len] = '\0';
cur = buf;
while ((cur < buf + whole_data_len) && (p = strchr(cur, '\n')) != NULL)
{
unsigned data_len = p - cur + 1;
// call the user function
echo(bev, cur, data_len);
cur += data_len;
}
// move the rest content to the head of the buffer.
if (cur < buf + whole_data_len)
{
memmove(buf, cur, (buf + whole_data_len) - cur);
}
pre_data_len = (buf + whole_data_len) - cur;
}
}
static void
listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sa, int socklen, void *user_data)
{
struct event_base *base = (struct event_base *) user_data;
struct bufferevent *bev;
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
if (!bev) {
fprintf(stderr, "Error constructing bufferevent!");
event_base_loopbreak(base);
return;
}
printf("recv user connection\n");
bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, NULL);
bufferevent_enable(bev, EV_READ);
//bufferevent_disable(bev, EV_READ);
}
static void
conn_writecb(struct bufferevent *bev, void *user_data)
{
struct evbuffer *output = bufferevent_get_output(bev);
if (evbuffer_get_length(output) == 0) {
//printf("flushed answer\n");
bufferevent_disable(bev, EV_WRITE);
//bufferevent_free(bev);
}
}
static void
conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{
if (events & BEV_EVENT_EOF) {
printf("Connection closed.\n");
} else if (events & BEV_EVENT_ERROR) {
printf("Got an error on the connection: %s\n",
strerror(errno));/*XXX win32*/
}
/* None of the other events can happen here, since we haven't enabled
* timeouts */
bufferevent_free(bev);
}
static void
signal_cb(evutil_socket_t sig, short events, void *user_data)
{
struct event_base *base = (struct event_base *) user_data;
struct timeval delay = { 2, 0 };
printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");
event_base_loopexit(base, &delay);
}
// This client uses short connection to send a request to the server in every connection.
// This client is mainly used to do benchmark for the server.
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include <signal.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdint.h>
#include <iostream>
#include <vector>
#include <cstdlib>
#include <cassert>
#include <set>
using namespace std;
static unsigned g_req_num_per_sec = 100;
static struct sockaddr_in sin;
static vector<unsigned> g_delays;
static unsigned g_timeout_conn_num = 0;
static vector<timeval> g_timeval_pool;
static set<unsigned> g_free_timeval_idx;
static unsigned alloc_timeval()
{
static const unsigned INIT_SIZE = 64;
if (g_free_timeval_idx.empty())
{
if (g_timeval_pool.empty())
{
g_timeval_pool.resize(INIT_SIZE);
for (unsigned i = 0; i < INIT_SIZE; ++i)
{
g_free_timeval_idx.insert(i);
}
}
else
{
const unsigned old_size = g_timeval_pool.size();
g_timeval_pool.resize(old_size * 2);
for (unsigned i = old_size; i < g_timeval_pool.size(); ++i)
{
g_free_timeval_idx.insert(i);
}
}
}
set<unsigned>::iterator first_elem = g_free_timeval_idx.begin();
unsigned alloc_idx = *first_elem;
g_free_timeval_idx.erase(first_elem);
return alloc_idx;
}
inline timeval& get_timeval(const unsigned idx)
{
assert(idx < g_timeval_pool.size());
return g_timeval_pool[idx];
}
static void free_timeval(const unsigned idx)
{
g_free_timeval_idx.insert(idx);
}
char* get_buffer(unsigned* len)
{
static bool is_init = false;
static const unsigned buf_size = 1024;
static char buffer[buf_size] = {0};
if (!is_init)
{
is_init = true;
memset(buffer + sizeof(uint32_t), (int) 'a', buf_size - sizeof(uint32_t));
buffer[buf_size - 1] = '\n';
}
*len = buf_size;
return buffer;
}
static void signal_int_cb(evutil_socket_t sig, short events, void *user_data)
{
struct event_base *base = (struct event_base *) user_data;
printf("Caught an interrupt signal; exiting cleanly in 0 seconds.\n");
event_base_loopexit(base, NULL);
}
static void conn_readcb(struct bufferevent *bev, void *user_data)
{
struct evbuffer *input = bufferevent_get_input(bev);
unsigned req_len = 0;
get_buffer(&req_len);
if (evbuffer_get_length(input) == req_len)
{
timeval cur_time;
gettimeofday(&cur_time, NULL);
const unsigned timeval_idx = (unsigned) user_data;
timeval* begin_time = &get_timeval(timeval_idx);
unsigned diff_msec = (cur_time.tv_sec - begin_time->tv_sec) * 1000000 + \
(cur_time.tv_usec - begin_time->tv_usec);
g_delays.push_back(diff_msec);
///cerr << "delay " << diff_msec << " microseconds" << endl;
free_timeval(timeval_idx);
evbuffer_drain(input, req_len);
bufferevent_free(bev);
}
}
static void conn_writecb(struct bufferevent *bev, void *user_data)
{
struct evbuffer *output = bufferevent_get_output(bev);
if (evbuffer_get_length(output) == 0) {
bufferevent_disable(bev, EV_WRITE);
bufferevent_enable(bev, EV_READ);
timeval timeout = {2, 0};
bufferevent_set_timeouts(bev, &timeout, NULL);
timeval* delay_time = &get_timeval((unsigned) user_data);
gettimeofday(delay_time, NULL);
}
}
void eventcb(struct bufferevent *bev, short events, void *user_data)
{
if (events & BEV_EVENT_CONNECTED) {
unsigned buf_len = 0;
char* buf = get_buffer(&buf_len);
evutil_socket_t soc = bufferevent_getfd(bev);
struct linger soc_linger = {1, 0};
setsockopt((int) soc, SOL_SOCKET, SO_LINGER, &soc_linger, sizeof(soc_linger));
bufferevent_write(bev, buf, buf_len);
bufferevent_enable(bev, EV_WRITE);
} else if (events & BEV_EVENT_ERROR) {
/* An error occured while connecting. */
cerr << "an error occured when connecting" << endl;
//free_timeval((unsigned) user_data);
//bufferevent_free(bev);
} else if (events & BEV_EVENT_TIMEOUT) {
free_timeval((unsigned) user_data);
bufferevent_free(bev);
++g_timeout_conn_num;
}
}
static void send_reqs(const unsigned req_num, struct event_base* base)
{
struct bufferevent *bev;
for (unsigned i = 0; i < req_num; ++i)
{
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
unsigned time_val_idx = alloc_timeval();
bufferevent_setcb(bev, conn_readcb, conn_writecb, eventcb, (void *)time_val_idx);
if (bufferevent_socket_connect(bev,
(struct sockaddr *)&sin, sizeof(sin)) < 0) {
/* Error starting connection */
free_timeval(time_val_idx);
bufferevent_free(bev);
cerr << "error connecting to host" << endl;
}
}
cerr << "send " << req_num << " reqs" << endl;
}
static void signal_alarm_cb(evutil_socket_t sig, short events, void *user_data)
{
struct event_base *base = (struct event_base *) user_data;
// avg delay time
if (!g_delays.empty())
{
unsigned total_delay = 0;
for (unsigned i = 0; i < g_delays.size(); ++i)
{
total_delay += g_delays[i];
}
cerr << "recv " << g_delays.size() << " rsp. avg delay: " << total_delay / g_delays.size()
<< " timeout num: " << g_timeout_conn_num << endl;
g_timeout_conn_num = 0;
g_delays.clear();
}
send_reqs(g_req_num_per_sec, base);
alarm(1);
}
int main_loop(int argc, char** argv)
{
if (argc < 3)
{
cerr << "Usage: " << argv[0] << " HOST_IP PORT [REQ_NUM]" << endl;
return 0;
}
const char* host_ip = argv[1];
const char* port_num = argv[2];
if (argc >= 4)
{
g_req_num_per_sec = atoi(argv[3]);
}
struct event_base *base;
//struct bufferevent *bev;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
//sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
if (inet_pton(AF_INET, host_ip, &sin.sin_addr) <= 0)
{
cerr << "Invalid host ip: " << host_ip << endl;
return 1;
}
sin.sin_port = htons(atoi(port_num)); /* Port 8080 */
base = event_base_new();
// signal event
struct event *signal_event = evsignal_new(base, SIGINT, signal_int_cb, (void *)base);
if (!signal_event || event_add(signal_event, NULL) < 0) {
fprintf(stderr, "Could not create/add a signal event!\n");
return 1;
}
struct event *alarm_signal_event = evsignal_new(base, SIGALRM, signal_alarm_cb, (void *)base);
if (!alarm_signal_event || event_add(alarm_signal_event, NULL) < 0) {
fprintf(stderr, "Could not create/add a alarm signal event!\n");
return 1;
}
send_reqs(g_req_num_per_sec, base);
alarm(1);
event_base_dispatch(base);
event_free(signal_event);
event_base_free(base);
return 0;
}
int main(int argc, char** argv)
{
return main_loop(argc, argv);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment