Skip to content

Instantly share code, notes, and snippets.

@jcdkuo
Created June 24, 2025 07:54
Show Gist options
  • Save jcdkuo/520d3efeffba43aa47faba9679570587 to your computer and use it in GitHub Desktop.
Save jcdkuo/520d3efeffba43aa47faba9679570587 to your computer and use it in GitHub Desktop.
#include <re.h>
#include <pthread.h>
#include <sys/un.h>
#include "config.h"
struct client_ctx {
int fd;
struct fd *fdh;
struct tmr timeout;
int retry;
bool done;
};
static void on_response(int flags, void *arg);
static void timeout_handler(void *arg) {
struct client_ctx *ctx = arg;
if (!ctx->done) {
warning("Client: Timeout on attempt %d\n", ctx->retry + 1);
close(ctx->fd);
if (++ctx->retry >= CLIENT_MAX_RETRIES) {
warning("Client: Max retries reached. Giving up.\n");
re_cancel();
} else {
uint32_t backoff = (1 << ctx->retry) * 1000;
if (backoff > CLIENT_MAX_BACKOFF_MS)
backoff = CLIENT_MAX_BACKOFF_MS;
tmr_start(&ctx->timeout, backoff, (tmr_h *)timeout_handler, ctx);
}
}
}
static void on_response(int flags, void *arg) {
(void)flags;
struct client_ctx *ctx = arg;
char buf[512] = {0};
int n = read(ctx->fd, buf, sizeof(buf));
if (n > 0) {
info("Client received: %.*s\n", n, buf);
ctx->done = true;
tmr_cancel(&ctx->timeout);
close(ctx->fd);
re_cancel();
}
}
static void *client_worker(void *arg) {
struct client_ctx ctx = {0};
ctx.retry = 0;
for (;;) {
struct sockaddr_un sa = {0};
sa.sun_family = AF_UNIX;
strcpy(sa.sun_path, "/tmp/libre.sock");
ctx.fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (connect(ctx.fd, (struct sockaddr*)&sa, sizeof(sa)) == 0) {
write(ctx.fd, "ping", 4);
tmr_start(&ctx.timeout, CLIENT_TIMEOUT_MS, timeout_handler, &ctx);
fd_listen(&ctx.fdh, ctx.fd, FD_READ, on_response, &ctx);
re_main(NULL); // block until done or timeout
break;
} else {
warning("Client: connect failed (retry %d)\n", ctx.retry + 1);
close(ctx.fd);
if (++ctx.retry >= CLIENT_MAX_RETRIES)
break;
uint32_t backoff = (1 << ctx.retry) * 1000;
if (backoff > CLIENT_MAX_BACKOFF_MS)
backoff = CLIENT_MAX_BACKOFF_MS;
sys_msleep(backoff);
}
}
return NULL;
}
int main(void) {
libre_init();
pthread_t threads[CLIENT_MAX_CONNECTIONS];
for (int i = 0; i < CLIENT_MAX_CONNECTIONS; ++i)
pthread_create(&threads[i], NULL, client_worker, NULL);
for (int i = 0; i < CLIENT_MAX_CONNECTIONS; ++i)
pthread_join(threads[i], NULL);
libre_close();
return 0;
}
#ifndef CONFIG_H
#define CONFIG_H
// Server settings
#define SERVER_THREAD_MAX 32 // 最大 thread 數
#define IDLE_THREAD_TIMEOUT_MS 60000 // idle thread 超過多久自殺(ms)
// Client settings
#define CLIENT_MAX_CONNECTIONS 10 // 同時啟動幾個 client
#define CLIENT_TIMEOUT_MS 10000 // 每個 client 等待 server 的回應時間
#define CLIENT_MAX_RETRIES 5 // 最多重試次數
#define CLIENT_MAX_BACKOFF_MS 30000 // backoff 最長時間(ms)
#endif
CC=gcc
CFLAGS=-Wall -O2
LIBS=-lre -lpthread
all: server client
server: server_threaded.c config.h
$(CC) $(CFLAGS) -o server server_threaded.c $(LIBS)
client: client_backoff.c config.h
$(CC) $(CFLAGS) -o client client_backoff.c $(LIBS)
clean:
rm -f server client
#!/bin/bash
# run_clients.sh: 啟動多個 client 進行連線測試
echo "啟動 ${CLIENT_MAX_CONNECTIONS:-10} 個 client..."
for i in $(seq 1 ${CLIENT_MAX_CONNECTIONS:-10}); do
./client &
done
wait
echo "所有 client 結束"
#include <re.h>
#include <pthread.h>
#include <sys/un.h>
#include "config.h"
struct worker {
struct thread *t;
bool busy;
time_t last_active;
};
static struct worker workers[SERVER_THREAD_MAX];
static pthread_mutex_t worker_lock = PTHREAD_MUTEX_INITIALIZER;
static int listen_fd;
static struct fd *listen_fdh;
static void client_handler(int cfd);
static void worker_loop(void *arg) {
int cfd = (intptr_t)arg;
client_handler(cfd);
close(cfd);
pthread_mutex_lock(&worker_lock);
for (int i = 0; i < SERVER_THREAD_MAX; ++i) {
if (workers[i].t && workers[i].t->tid == pthread_self()) {
workers[i].busy = false;
workers[i].last_active = time(NULL);
break;
}
}
pthread_mutex_unlock(&worker_lock);
}
static void try_cleanup_idle_workers(void) {
time_t now = time(NULL);
pthread_mutex_lock(&worker_lock);
for (int i = 0; i < SERVER_THREAD_MAX; ++i) {
if (workers[i].t && !workers[i].busy &&
now - workers[i].last_active > IDLE_THREAD_TIMEOUT_MS / 1000) {
thread_cancel(workers[i].t);
mem_deref(workers[i].t);
workers[i].t = NULL;
}
}
pthread_mutex_unlock(&worker_lock);
}
static void dispatch_connection(int cfd) {
pthread_mutex_lock(&worker_lock);
for (int i = 0; i < SERVER_THREAD_MAX; ++i) {
if (!workers[i].t) {
thread_create(&workers[i].t, worker_loop, (void *)(intptr_t)cfd);
workers[i].busy = true;
pthread_mutex_unlock(&worker_lock);
return;
}
}
pthread_mutex_unlock(&worker_lock);
warning("Too many clients! Dropping connection.\n");
close(cfd);
}
static void on_accept(int flags, void *arg) {
(void)flags; (void)arg;
struct sockaddr_un su;
socklen_t len = sizeof(su);
int cfd = accept(listen_fd, (struct sockaddr*)&su, &len);
if (cfd >= 0) {
dispatch_connection(cfd);
try_cleanup_idle_workers();
}
}
static void client_handler(int cfd) {
struct mbuf *mb = mbuf_alloc(512);
struct pollfd pfd = {.fd = cfd, .events = POLLIN};
while (poll(&pfd, 1, -1) > 0) {
if (pfd.revents & POLLIN) {
ssize_t n = read(cfd, mb->buf, 512);
if (n <= 0) break;
write(cfd, "pong", 4);
}
}
mem_deref(mb);
}
int main(void) {
libre_init();
struct sockaddr_un sa = {0};
sa.sun_family = AF_UNIX;
strcpy(sa.sun_path, "/tmp/libre.sock");
unlink(sa.sun_path);
listen_fd = socket(AF_UNIX, SOCK_STREAM, 0);
bind(listen_fd, (struct sockaddr*)&sa, sizeof(sa));
listen(listen_fd, 10);
fd_listen(&listen_fdh, listen_fd, FD_READ, on_accept, NULL);
re_main(NULL);
fd_close(listen_fdh);
close(listen_fd);
libre_close();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment