Created
November 4, 2016 21:07
-
-
Save apofiget/eef3f7b6734d4ffc4e56148eeebe9d8f to your computer and use it in GitHub Desktop.
Libev complete example, http://slonik-v-domene.livejournal.com
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <sys/types.h> | |
#include <sys/socket.h> | |
#include <arpa/inet.h> | |
#include <errno.h> | |
#include <ev.h> | |
#include <fcntl.h> | |
#include <pthread.h> | |
#include <poll.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <sysexits.h> | |
#include <unistd.h> | |
/* | |
* Worker thread context | |
*/ | |
typedef struct _wrk_ctx | |
{ | |
/* Outgoing queue socket */ | |
int out_queue; | |
/* Incoming queue socket */ | |
int in_queue; | |
} WRK_CTX; | |
/* | |
* Network thread context | |
*/ | |
typedef struct _net_ctx | |
{ | |
/* Server listener */ | |
int sock; | |
/* Worker context */ | |
WRK_CTX * wrk_ctx; | |
} NET_CTX; | |
/* | |
* Client connection context | |
*/ | |
typedef struct _io_ctx | |
{ | |
/* Network context */ | |
NET_CTX * net_ctx; | |
/* I/O context */ | |
struct ev_loop * loop; | |
/* Client socket */ | |
int sock; | |
/* I/O context */ | |
struct ev_io io_queue; | |
/* I/O context */ | |
struct ev_io io_net; | |
/* Async watcher */ | |
struct ev_async async; | |
/* Buffer */ | |
char buf[4096]; | |
/* Buffer length */ | |
int buf_len; | |
} IO_CTX; | |
/* | |
* Async callback | |
*/ | |
static void async_callback(struct ev_loop * loop, | |
struct ev_async * watcher, | |
int rev) | |
{ | |
IO_CTX * io_ctx = (IO_CTX *)(watcher -> data); | |
if (rev & EV_ERROR) { fprintf(stderr, "async_callback/EV_ERROR\n"); return; } | |
ev_async_stop(loop, watcher); | |
ev_io_set(&(io_ctx -> io_net), io_ctx -> sock, EV_WRITE); | |
ev_io_start(loop, &(io_ctx -> io_net)); | |
fprintf(stderr, "%p async_callback/work completed: %p\n", (void*)pthread_self(), (void *)io_ctx); | |
} | |
/* | |
* Enqueue callbaxk | |
*/ | |
static void queue_callback(struct ev_loop * loop, | |
struct ev_io * watcher, | |
int rev) | |
{ | |
IO_CTX * io_ctx = (IO_CTX *)(watcher -> data); | |
if (rev & EV_ERROR) { fprintf(stderr, "queue_callback/EV_ERROR\n"); return; } | |
if (rev & EV_WRITE) | |
{ | |
int bytes; | |
ev_io_stop(loop, watcher); | |
ev_async_start(loop, &(io_ctx -> async)); | |
bytes = send(watcher -> fd, &io_ctx, sizeof(IO_CTX *), 0); | |
if (bytes != sizeof(IO_CTX *)) | |
{ | |
close(watcher -> fd); | |
free(io_ctx); | |
return; | |
} | |
fprintf(stderr, "%p queue_callback/%p\n", (void*)pthread_self(), (void *)io_ctx); | |
} | |
} | |
/* | |
* Input/output callbaxk | |
*/ | |
static void client_callback(struct ev_loop * loop, | |
struct ev_io * watcher, | |
int rev) | |
{ | |
IO_CTX * io_ctx = (IO_CTX *)(watcher -> data); | |
if (rev & EV_ERROR) { fprintf(stderr, "client_callback/EV_ERROR\n"); return; } | |
if (rev & EV_READ) | |
{ | |
ev_io_stop(loop, watcher); | |
io_ctx -> buf_len = recv(watcher -> fd, io_ctx -> buf, 4095, 0); | |
fprintf(stderr, "%p client_callback/recv: %d\n", (void*)pthread_self(), io_ctx -> buf_len); | |
if (io_ctx -> buf_len <= 0) | |
{ | |
close(watcher -> fd); | |
free(io_ctx); | |
return; | |
} | |
ev_io_start(loop, &(io_ctx -> io_queue)); | |
} | |
if (rev & EV_WRITE) | |
{ | |
int bytes; | |
ev_io_stop(loop, watcher); | |
bytes = send(watcher -> fd, io_ctx -> buf, io_ctx -> buf_len, 0); | |
fprintf(stderr, "%p client_callback/send: %d\n", (void*)pthread_self(), bytes); | |
if (bytes <= 0) | |
{ | |
close(watcher -> fd); | |
free(io_ctx); | |
return; | |
} | |
ev_io_set(watcher, watcher -> fd, EV_READ); | |
ev_io_start(loop, watcher); | |
} | |
} | |
/* | |
* New connection callback | |
*/ | |
static void accept_callback(struct ev_loop * loop, | |
struct ev_io * watcher, | |
int rev) | |
{ | |
struct sockaddr_in sock_addr; | |
socklen_t sock_addr_len = sizeof(sock_addr); | |
int client_sock; | |
char client_ip[16]; | |
IO_CTX * io_ctx = NULL; | |
int tmp; | |
int rc; | |
if (rev & EV_ERROR) { fprintf(stderr, "accept_callback/EV_ERROR\n"); return; } | |
memset(&sock_addr, 0, sock_addr_len); | |
client_sock = accept(watcher -> fd, (struct sockaddr *)&sock_addr, &sock_addr_len); | |
if (client_sock == -1) { fprintf(stderr, "accept_callback/accept\n"); return; } | |
if (inet_ntop(AF_INET, &sock_addr.sin_addr, client_ip, 16) == NULL) { close(client_sock); return; } | |
tmp = 1; | |
rc = setsockopt(client_sock, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(int)); | |
if (rc == -1) { close(client_sock); fprintf(stderr, "setsockopt/SO_REUSEADDR\n"); return; } | |
tmp = 1; | |
rc = setsockopt(client_sock, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(int)); | |
if (rc == -1) { close(client_sock); fprintf(stderr, "setsockopt/SO_REUSEPORT\n"); return; } | |
rc = fcntl(client_sock, F_SETFL, fcntl(client_sock, F_GETFL, 0) | O_NONBLOCK); | |
if (rc == -1) { close(client_sock); fprintf(stderr, "fcntl/O_NONBLOCK\n"); return; } | |
fprintf(stderr, "accept_callback/accept: %s:%d\n", client_ip, ntohs(sock_addr.sin_port)); | |
io_ctx = (IO_CTX *)malloc(sizeof(IO_CTX)); | |
memset(io_ctx, 0, sizeof(IO_CTX)); | |
io_ctx -> net_ctx = (NET_CTX *)(watcher -> data); | |
io_ctx -> loop = loop; | |
io_ctx -> sock = client_sock; | |
ev_async_init(&(io_ctx -> async), async_callback); | |
io_ctx -> async.data = io_ctx; | |
ev_init(&(io_ctx -> io_queue), queue_callback); | |
ev_io_set(&(io_ctx -> io_queue), io_ctx -> net_ctx -> wrk_ctx -> out_queue, EV_WRITE); | |
io_ctx -> io_queue.data = io_ctx; | |
ev_init(&(io_ctx -> io_net), client_callback); | |
ev_io_set(&(io_ctx -> io_net), io_ctx -> sock, EV_READ); | |
io_ctx -> io_net.data = io_ctx; | |
ev_io_start(loop, &(io_ctx -> io_net)); | |
} | |
/* | |
* Network thread function | |
*/ | |
static void * network_thr_func(void * thr_ctx) | |
{ | |
ev_io srv_io; | |
NET_CTX * net_ctx = (NET_CTX *)thr_ctx; | |
struct ev_loop * loop = ev_loop_new(EVFLAG_AUTO); | |
ev_init(&srv_io, accept_callback); | |
ev_io_set(&srv_io, net_ctx -> sock, EV_READ); | |
srv_io.data = (NET_CTX *)thr_ctx; | |
ev_io_start(loop, &srv_io); | |
ev_loop(loop, 0); | |
ev_loop_destroy(loop); | |
return NULL; | |
} | |
/* | |
* Worker thread function | |
*/ | |
static void * worker_thr_func(void * thr_ctx) | |
{ | |
WRK_CTX * wrk_ctx = (WRK_CTX *)thr_ctx; | |
IO_CTX * io_ctx = NULL; | |
struct pollfd fds; | |
int bytes; | |
int rc; | |
fds.fd = wrk_ctx -> in_queue; | |
fds.events = POLLIN; | |
for(;;) | |
{ | |
fds.revents = 0; | |
rc = poll(&fds, 1, 1000); | |
if (rc < 0) | |
{ | |
fprintf(stderr, "%p Error: %d", (void*)pthread_self(), errno); | |
return NULL; | |
} | |
if (rc == 0) { /* fprintf(stderr, "Timeout\n"); */ } | |
else | |
{ | |
bytes = recv(fds.fd, &io_ctx, sizeof(IO_CTX *), 0); | |
fprintf(stderr, "%p worker_thr/recv: %d\n", (void*)pthread_self(), bytes); | |
/* Impossible happened? */ | |
if (bytes != sizeof(IO_CTX *)) { return NULL; } | |
fprintf(stderr, "From queue %p\n", (void*)io_ctx); | |
/* Do a big slow work here //////// */ | |
do | |
{ | |
int pos = 0; | |
fprintf(stderr, "%p Handler: %d `%.*s`\n", (void*)pthread_self(), io_ctx -> buf_len, io_ctx -> buf_len, io_ctx -> buf); | |
for (pos = 0; pos < io_ctx -> buf_len; pos += 2) | |
{ | |
if (io_ctx -> buf[pos] > ' ') { io_ctx -> buf[pos] = '_'; } | |
} | |
sleep(1); | |
} | |
while(0); | |
/* /////////////////////////////// */ | |
/* Send notification to the network thread */ | |
ev_async_send(io_ctx -> loop, &(io_ctx -> async)); | |
} | |
} | |
return NULL; | |
} | |
static int make_sock() | |
{ | |
struct sockaddr_in sock_addr; | |
int tmp; | |
int rc; | |
int sock = socket(AF_INET, SOCK_STREAM, 0); | |
if (sock == -1) { fprintf(stderr, "socket\n"); return EX_SOFTWARE; } | |
sock_addr.sin_family = AF_INET; | |
sock_addr.sin_port = htons(7788); | |
sock_addr.sin_addr.s_addr = htonl(INADDR_ANY); | |
tmp = 1; | |
rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(int)); | |
if (rc == -1) { close(sock); fprintf(stderr, "setsockopt/SO_REUSEADDR\n"); return -1; } | |
tmp = 1; | |
rc = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(int)); | |
if (rc == -1) { close(sock); fprintf(stderr, "setsockopt/SO_REUSEPORT\n"); return -1; } | |
rc = fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | FD_CLOEXEC | O_NONBLOCK); | |
if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; } | |
rc = bind(sock, (struct sockaddr *)&sock_addr, sizeof(struct sockaddr_in)); | |
if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; } | |
rc = listen(sock, 128); | |
if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; } | |
return sock; | |
} | |
#ifndef C_NETWORK_THR | |
#define C_NETWORK_THR 2 | |
#endif | |
#ifndef C_WORKER_THR | |
#define C_WORKER_THR 5 | |
#endif | |
int main(int argc, char **argv) | |
{ | |
int sockpair[2]; | |
int thr; | |
WRK_CTX wrk_ctx; | |
NET_CTX net_ctx; | |
pthread_t worker_thr[C_WORKER_THR]; | |
pthread_t network_thr[C_NETWORK_THR]; | |
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sockpair) == -1) { fprintf(stderr, "socketpair\n"); return EX_SOFTWARE; } | |
wrk_ctx.out_queue = sockpair[0]; | |
wrk_ctx.in_queue = sockpair[1]; | |
net_ctx.wrk_ctx = &wrk_ctx; | |
net_ctx.sock = make_sock(); | |
if (net_ctx.sock == -1) { return EX_SOFTWARE; } | |
/* Worker threads */ | |
for(thr = 0; thr < C_WORKER_THR; ++thr) { pthread_create(&worker_thr[thr], NULL, worker_thr_func, &wrk_ctx); } | |
/* Network threads */ | |
for(thr= 0; thr < C_NETWORK_THR; ++thr) { pthread_create(&network_thr[thr], NULL, network_thr_func, &net_ctx); } | |
for(thr = 0; thr < C_NETWORK_THR; ++thr) { pthread_join(network_thr[thr], NULL); } | |
for(thr = 0; thr < C_WORKER_THR; ++thr) { pthread_join(worker_thr[thr], NULL); } | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment