Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save azat/c0f5ff1a9ccb531d2fe19c37e32a48ea to your computer and use it in GitHub Desktop.
Save azat/c0f5ff1a9ccb531d2fe19c37e32a48ea to your computer and use it in GitHub Desktop.
// https://github.com/libevent/libevent/pull/208
#define _GNU_SOURCE
#include <pthread.h>
#include <assert.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <event2/event.h>
#define ITERATIONS 700000
#define ITERATIONS_SIZE 10
struct event_base *base;
struct bufferevent *pair[2];
struct evstat
{
size_t read;
size_t readed;
size_t write;
size_t disable;
size_t free;
} stat;
void prstat()
{
fprintf(stderr, "stat> read: %zu, readed: %zu, write: %zu (%zu), disabled: %zu\n",
stat.read, stat.readed, stat.write, evbuffer_get_length(bufferevent_get_input(pair[1])), stat.disable);
}
void readcb(struct bufferevent *bev, void *arg)
{
struct evbuffer *evbuf;
assert(evbuf = evbuffer_new());
bufferevent_read_buffer(bev, evbuf);
stat.readed += evbuffer_get_length(evbuf);
evbuffer_free(evbuf);
++stat.read;
if (stat.readed == (ITERATIONS * ITERATIONS_SIZE)) {
event_base_loopexit(base, 0);
}
}
void *endiscb(void *arg)
{
size_t i;
for (i = 0; ; ++i) {
if (stat.free) {
break;
}
if (!(i % 100)) {
assert(!bufferevent_disable(pair[1], EV_READ));
assert(!bufferevent_enable(pair[1], EV_READ));
++stat.disable;
}
}
}
void *writercb(void *arg)
{
char buffer[ITERATIONS_SIZE] = { 0 };
size_t i;
for (i = 0; i < ITERATIONS; ++i) {
bufferevent_write(pair[0], buffer, sizeof(buffer));
++stat.write;
if (!(i % 100000)) {
prstat();
}
}
}
void *loopcb(void *arg)
{
event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY);
}
void firstcb(evutil_socket_t fd , short what, void *arg)
{
sleep(5);
}
int main(int argc, char **argv)
{
pthread_t writer;
pthread_t endis;
pthread_t loop;
evutil_socket_t pipe[2];
evthread_use_pthreads();
assert(base = event_base_new());
event_base_once(base, -1, EV_TIMEOUT, firstcb, NULL, NULL);
evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, pipe);
evutil_make_socket_nonblocking(pipe[0]);
evutil_make_socket_nonblocking(pipe[1]);
pair[0] = bufferevent_socket_new(base, pipe[0], BEV_OPT_THREADSAFE);
pair[1] = bufferevent_socket_new(base, pipe[1], BEV_OPT_THREADSAFE);
bufferevent_setcb(pair[1], readcb, NULL, NULL, NULL);
assert(!bufferevent_disable(pair[0], EV_READ));
assert(!bufferevent_enable(pair[1], EV_READ));
assert(!pthread_create(&writer, NULL, writercb, NULL));
assert(!pthread_create(&endis, NULL, endiscb, NULL));
assert(!pthread_create(&loop, NULL, loopcb, NULL));
assert(!pthread_join(loop, NULL));
assert(!pthread_join(writer, NULL));
stat.free = 1;
assert(!pthread_join(endis, NULL));
prstat();
bufferevent_free(pair[0]);
bufferevent_free(pair[1]);
event_base_free(base);
#ifdef LIBEVENT_2_1
libevent_global_shutdown();
#endif
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment