Skip to content

Instantly share code, notes, and snippets.

@apofiget
Created November 4, 2016 21:07
Show Gist options
  • Save apofiget/eef3f7b6734d4ffc4e56148eeebe9d8f to your computer and use it in GitHub Desktop.
Save apofiget/eef3f7b6734d4ffc4e56148eeebe9d8f to your computer and use it in GitHub Desktop.
Libev complete example, http://slonik-v-domene.livejournal.com
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include <ev.h>
#include <fcntl.h>
#include <pthread.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sysexits.h>
#include <unistd.h>
/*
* Worker thread context
*/
typedef struct _wrk_ctx
{
/* Outgoing queue socket */
int out_queue;
/* Incoming queue socket */
int in_queue;
} WRK_CTX;
/*
* Network thread context
*/
typedef struct _net_ctx
{
/* Server listener */
int sock;
/* Worker context */
WRK_CTX * wrk_ctx;
} NET_CTX;
/*
* Client connection context
*/
typedef struct _io_ctx
{
/* Network context */
NET_CTX * net_ctx;
/* I/O context */
struct ev_loop * loop;
/* Client socket */
int sock;
/* I/O context */
struct ev_io io_queue;
/* I/O context */
struct ev_io io_net;
/* Async watcher */
struct ev_async async;
/* Buffer */
char buf[4096];
/* Buffer length */
int buf_len;
} IO_CTX;
/*
* Async callback
*/
static void async_callback(struct ev_loop * loop,
struct ev_async * watcher,
int rev)
{
IO_CTX * io_ctx = (IO_CTX *)(watcher -> data);
if (rev & EV_ERROR) { fprintf(stderr, "async_callback/EV_ERROR\n"); return; }
ev_async_stop(loop, watcher);
ev_io_set(&(io_ctx -> io_net), io_ctx -> sock, EV_WRITE);
ev_io_start(loop, &(io_ctx -> io_net));
fprintf(stderr, "%p async_callback/work completed: %p\n", (void*)pthread_self(), (void *)io_ctx);
}
/*
* Enqueue callbaxk
*/
static void queue_callback(struct ev_loop * loop,
struct ev_io * watcher,
int rev)
{
IO_CTX * io_ctx = (IO_CTX *)(watcher -> data);
if (rev & EV_ERROR) { fprintf(stderr, "queue_callback/EV_ERROR\n"); return; }
if (rev & EV_WRITE)
{
int bytes;
ev_io_stop(loop, watcher);
ev_async_start(loop, &(io_ctx -> async));
bytes = send(watcher -> fd, &io_ctx, sizeof(IO_CTX *), 0);
if (bytes != sizeof(IO_CTX *))
{
close(watcher -> fd);
free(io_ctx);
return;
}
fprintf(stderr, "%p queue_callback/%p\n", (void*)pthread_self(), (void *)io_ctx);
}
}
/*
* Input/output callbaxk
*/
static void client_callback(struct ev_loop * loop,
struct ev_io * watcher,
int rev)
{
IO_CTX * io_ctx = (IO_CTX *)(watcher -> data);
if (rev & EV_ERROR) { fprintf(stderr, "client_callback/EV_ERROR\n"); return; }
if (rev & EV_READ)
{
ev_io_stop(loop, watcher);
io_ctx -> buf_len = recv(watcher -> fd, io_ctx -> buf, 4095, 0);
fprintf(stderr, "%p client_callback/recv: %d\n", (void*)pthread_self(), io_ctx -> buf_len);
if (io_ctx -> buf_len <= 0)
{
close(watcher -> fd);
free(io_ctx);
return;
}
ev_io_start(loop, &(io_ctx -> io_queue));
}
if (rev & EV_WRITE)
{
int bytes;
ev_io_stop(loop, watcher);
bytes = send(watcher -> fd, io_ctx -> buf, io_ctx -> buf_len, 0);
fprintf(stderr, "%p client_callback/send: %d\n", (void*)pthread_self(), bytes);
if (bytes <= 0)
{
close(watcher -> fd);
free(io_ctx);
return;
}
ev_io_set(watcher, watcher -> fd, EV_READ);
ev_io_start(loop, watcher);
}
}
/*
* New connection callback
*/
static void accept_callback(struct ev_loop * loop,
struct ev_io * watcher,
int rev)
{
struct sockaddr_in sock_addr;
socklen_t sock_addr_len = sizeof(sock_addr);
int client_sock;
char client_ip[16];
IO_CTX * io_ctx = NULL;
int tmp;
int rc;
if (rev & EV_ERROR) { fprintf(stderr, "accept_callback/EV_ERROR\n"); return; }
memset(&sock_addr, 0, sock_addr_len);
client_sock = accept(watcher -> fd, (struct sockaddr *)&sock_addr, &sock_addr_len);
if (client_sock == -1) { fprintf(stderr, "accept_callback/accept\n"); return; }
if (inet_ntop(AF_INET, &sock_addr.sin_addr, client_ip, 16) == NULL) { close(client_sock); return; }
tmp = 1;
rc = setsockopt(client_sock, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(int));
if (rc == -1) { close(client_sock); fprintf(stderr, "setsockopt/SO_REUSEADDR\n"); return; }
tmp = 1;
rc = setsockopt(client_sock, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(int));
if (rc == -1) { close(client_sock); fprintf(stderr, "setsockopt/SO_REUSEPORT\n"); return; }
rc = fcntl(client_sock, F_SETFL, fcntl(client_sock, F_GETFL, 0) | O_NONBLOCK);
if (rc == -1) { close(client_sock); fprintf(stderr, "fcntl/O_NONBLOCK\n"); return; }
fprintf(stderr, "accept_callback/accept: %s:%d\n", client_ip, ntohs(sock_addr.sin_port));
io_ctx = (IO_CTX *)malloc(sizeof(IO_CTX));
memset(io_ctx, 0, sizeof(IO_CTX));
io_ctx -> net_ctx = (NET_CTX *)(watcher -> data);
io_ctx -> loop = loop;
io_ctx -> sock = client_sock;
ev_async_init(&(io_ctx -> async), async_callback);
io_ctx -> async.data = io_ctx;
ev_init(&(io_ctx -> io_queue), queue_callback);
ev_io_set(&(io_ctx -> io_queue), io_ctx -> net_ctx -> wrk_ctx -> out_queue, EV_WRITE);
io_ctx -> io_queue.data = io_ctx;
ev_init(&(io_ctx -> io_net), client_callback);
ev_io_set(&(io_ctx -> io_net), io_ctx -> sock, EV_READ);
io_ctx -> io_net.data = io_ctx;
ev_io_start(loop, &(io_ctx -> io_net));
}
/*
* Network thread function
*/
static void * network_thr_func(void * thr_ctx)
{
ev_io srv_io;
NET_CTX * net_ctx = (NET_CTX *)thr_ctx;
struct ev_loop * loop = ev_loop_new(EVFLAG_AUTO);
ev_init(&srv_io, accept_callback);
ev_io_set(&srv_io, net_ctx -> sock, EV_READ);
srv_io.data = (NET_CTX *)thr_ctx;
ev_io_start(loop, &srv_io);
ev_loop(loop, 0);
ev_loop_destroy(loop);
return NULL;
}
/*
* Worker thread function
*/
static void * worker_thr_func(void * thr_ctx)
{
WRK_CTX * wrk_ctx = (WRK_CTX *)thr_ctx;
IO_CTX * io_ctx = NULL;
struct pollfd fds;
int bytes;
int rc;
fds.fd = wrk_ctx -> in_queue;
fds.events = POLLIN;
for(;;)
{
fds.revents = 0;
rc = poll(&fds, 1, 1000);
if (rc < 0)
{
fprintf(stderr, "%p Error: %d", (void*)pthread_self(), errno);
return NULL;
}
if (rc == 0) { /* fprintf(stderr, "Timeout\n"); */ }
else
{
bytes = recv(fds.fd, &io_ctx, sizeof(IO_CTX *), 0);
fprintf(stderr, "%p worker_thr/recv: %d\n", (void*)pthread_self(), bytes);
/* Impossible happened? */
if (bytes != sizeof(IO_CTX *)) { return NULL; }
fprintf(stderr, "From queue %p\n", (void*)io_ctx);
/* Do a big slow work here //////// */
do
{
int pos = 0;
fprintf(stderr, "%p Handler: %d `%.*s`\n", (void*)pthread_self(), io_ctx -> buf_len, io_ctx -> buf_len, io_ctx -> buf);
for (pos = 0; pos < io_ctx -> buf_len; pos += 2)
{
if (io_ctx -> buf[pos] > ' ') { io_ctx -> buf[pos] = '_'; }
}
sleep(1);
}
while(0);
/* /////////////////////////////// */
/* Send notification to the network thread */
ev_async_send(io_ctx -> loop, &(io_ctx -> async));
}
}
return NULL;
}
static int make_sock()
{
struct sockaddr_in sock_addr;
int tmp;
int rc;
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1) { fprintf(stderr, "socket\n"); return EX_SOFTWARE; }
sock_addr.sin_family = AF_INET;
sock_addr.sin_port = htons(7788);
sock_addr.sin_addr.s_addr = htonl(INADDR_ANY);
tmp = 1;
rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(int));
if (rc == -1) { close(sock); fprintf(stderr, "setsockopt/SO_REUSEADDR\n"); return -1; }
tmp = 1;
rc = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(int));
if (rc == -1) { close(sock); fprintf(stderr, "setsockopt/SO_REUSEPORT\n"); return -1; }
rc = fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | FD_CLOEXEC | O_NONBLOCK);
if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; }
rc = bind(sock, (struct sockaddr *)&sock_addr, sizeof(struct sockaddr_in));
if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; }
rc = listen(sock, 128);
if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; }
return sock;
}
#ifndef C_NETWORK_THR
#define C_NETWORK_THR 2
#endif
#ifndef C_WORKER_THR
#define C_WORKER_THR 5
#endif
int main(int argc, char **argv)
{
int sockpair[2];
int thr;
WRK_CTX wrk_ctx;
NET_CTX net_ctx;
pthread_t worker_thr[C_WORKER_THR];
pthread_t network_thr[C_NETWORK_THR];
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sockpair) == -1) { fprintf(stderr, "socketpair\n"); return EX_SOFTWARE; }
wrk_ctx.out_queue = sockpair[0];
wrk_ctx.in_queue = sockpair[1];
net_ctx.wrk_ctx = &wrk_ctx;
net_ctx.sock = make_sock();
if (net_ctx.sock == -1) { return EX_SOFTWARE; }
/* Worker threads */
for(thr = 0; thr < C_WORKER_THR; ++thr) { pthread_create(&worker_thr[thr], NULL, worker_thr_func, &wrk_ctx); }
/* Network threads */
for(thr= 0; thr < C_NETWORK_THR; ++thr) { pthread_create(&network_thr[thr], NULL, network_thr_func, &net_ctx); }
for(thr = 0; thr < C_NETWORK_THR; ++thr) { pthread_join(network_thr[thr], NULL); }
for(thr = 0; thr < C_WORKER_THR; ++thr) { pthread_join(worker_thr[thr], NULL); }
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment