Skip to content

Instantly share code, notes, and snippets.

@cybertxt
Last active December 11, 2019 10:04
Show Gist options
  • Save cybertxt/7866965946c3952a5eb00f70b3fbfef3 to your computer and use it in GitHub Desktop.
Save cybertxt/7866965946c3952a5eb00f70b3fbfef3 to your computer and use it in GitHub Desktop.
libuv works with one acceptor thread and multiple worker threads sample. This sample works ONLY on UNIX-like system.
#include <uv.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define WORKER_NUM 10
uv_thread_t worker_tid[WORKER_NUM];
uv_async_t async[WORKER_NUM];
uv_mutex_t mutex[WORKER_NUM];
uv_cond_t cond[WORKER_NUM];
int passing_fd[WORKER_NUM];
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(suggested_size);
buf->len = suggested_size;
}
void echo_write(uv_write_t *req, int status) {
if (status)
printf("write error %s\n", uv_err_name(status));
free(req);
}
void client_on_close(uv_handle_t* handle) {
free(handle);
}
void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t*) client, &fd);
if (nread < 0) {
if (nread != UV_EOF)
printf("read error %s\n", uv_err_name(nread));
printf("%d is going to be closed\n", fd);
uv_close((uv_handle_t*) client, client_on_close);
if (buf->base)
free(buf->base);
return;
}
printf("read %d bytes from %d\n", nread, fd);
uv_write_t *req = (uv_write_t *)malloc(sizeof(uv_write_t));
uv_buf_t wrbuf = uv_buf_init(buf->base, nread);
uv_write(req, client, &wrbuf, 1, echo_write);
free(buf->base);
}
void async_cb(uv_async_t* handle) {
int i = (int)handle->data;
uv_mutex_lock(&mutex[i]);
if (passing_fd[i] == -1) {
uv_mutex_unlock(&mutex[i]);
printf("invalid fd\n");
return;
}
uv_tcp_t* htcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(handle->loop, htcp);
printf("thread %d will take fd: %d\n", i, passing_fd[i]);
uv_tcp_open(htcp, passing_fd[i]);
passing_fd[i] = -1;
uv_cond_signal(&cond[i]);
uv_mutex_unlock(&mutex[i]);
uv_read_start((uv_stream_t*)htcp, alloc_buffer, echo_read);
}
void worker_thread(void* args) {
int i = (int)args;
uv_loop_t loop;
uv_loop_init(&loop);
uv_async_init(&loop, &async[i], async_cb);
async[i].data = (void*)i;
uv_run(&loop, UV_RUN_DEFAULT);
}
void new_connection(uv_stream_t *server, int status) {
static int i = 0;
int j;
if (status == -1) {
return;
}
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(uv_default_loop(), client);
if (uv_accept(server, (uv_stream_t*) client) == 0) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t*) client, &fd);
printf("accepted %d\n", fd);
j = i % WORKER_NUM;
uv_mutex_lock(&mutex[j]);
passing_fd[j] = dup(fd);
printf("dup new fd: %d\n", passing_fd[j]);
uv_close((uv_handle_t*)client, client_on_close);
uv_async_send(&async[j]);
while(passing_fd[j] != -1)
uv_cond_wait(&cond[j], &mutex[j]);
uv_mutex_unlock(&mutex[j]);
i++;
}
}
int main() {
int ret;
uv_tcp_t listen_handle;
uv_tcp_init(uv_default_loop(), &listen_handle);
struct sockaddr_in bind_addr;
uv_ip4_addr("0.0.0.0", 9987, &bind_addr);
uv_tcp_bind(&listen_handle, (const struct sockaddr *)&bind_addr, 0);
if ((ret = uv_listen((uv_stream_t*) &listen_handle, 128, new_connection)) != 0) {
fprintf(stderr, "listen error %s\n", uv_err_name(ret));
return 1;
}
int i = 0;
for (; i < WORKER_NUM; i++) {
passing_fd[i] = -1;
uv_mutex_init(&mutex[i]);
uv_cond_init(&cond[i]);
uv_thread_create(&worker_tid[i], worker_thread, (void*)i);
}
return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
}
import socket
import sys
i = 0
socks=[]
while i < 200:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 9987)
sock.connect(server_address)
socks.append(sock)
sock.sendall(str(i))
i = i + 1
for s in socks:
data = s.recv(100)
print(data)
s.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment