Skip to content

Instantly share code, notes, and snippets.

@sandersaares
Created March 31, 2025 08:18
Show Gist options
  • Save sandersaares/04af4e5a3081ed78bece093c551c0994 to your computer and use it in GitHub Desktop.
Save sandersaares/04af4e5a3081ed78bece093c551c0994 to your computer and use it in GitHub Desktop.
Structural changes for improved throughput
pub fn is_forbidden_text_static(s: &str) -> bool {
FORBIDDEN_TEXTS
.iter()
.any(|candidate| s.starts_with(candidate))
}
pub static FORBIDDEN_TEXTS: LazyLock<Vec<String>> = LazyLock::new(generate_forbidden_texts);
import http from 'k6/http';
export default function () {
let params = {
timeout: "10000ms",
};
let payload = '.. elided for snippet brevity ..';
http.post('http://10.0.0.8:1234/check', payload, params);
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/check", post(check));
let addr = SocketAddr::from(([0, 0, 0, 0], 1234));
println!("Server running on http://{}", addr);
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app.into_make_service())
.await
.unwrap();
}
async fn check(body: String) -> impl IntoResponse {
if is_forbidden_text_static(&body) {
(StatusCode::OK, "true")
} else {
(StatusCode::OK, "false")
}
}
sync fn main() {
let num_workers = num_cpus::get();
let mut work_txs = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
const WORKER_QUEUE_SIZE: usize = 4;
let (tx, rx) = channel(WORKER_QUEUE_SIZE);
work_txs.push(tx);
thread::spawn(move || worker_entrypoint(rx));
}
listener_entrypoint(addr, work_txs).await;
}
fn worker_entrypoint(mut rx: Receiver<TcpStream>) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
// We build a new Axum app on every worker, ensuring that workers are independent.
let app = Router::new().route("/check", post(check));
let service_factory = app.into_make_service_with_connect_info::<SocketAddr>();
while let Some(stream) = rx.recv().await {
let peer_addr = stream.peer_addr().unwrap();
// For each connection, we spawn a new task to handle it.
tokio::spawn({
let mut service_factory = service_factory.clone();
async move {
let service = service_factory.call(peer_addr).await.unwrap();
let hyper_service = TowerToHyperService::new(service);
let http = hyper::server::conn::http1::Builder::new();
// We do not care if the request handling succeeds or fails, so ignore result.
_ = http
.serve_connection(TokioIo::new(stream), hyper_service)
.await;
}
});
}
});
}
let all_processors = ProcessorSet::all();
let num_workers = all_processors.len();
let mut work_txs = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
const WORKER_QUEUE_SIZE: usize = 4;
let (tx, rx) = channel(WORKER_QUEUE_SIZE);
work_txs.push(tx);
// In each loop iteration, we spawn a new worker thread that the OS is allowed to assign
// to any of the processors in the set to balance load among them. This is almost entirely
// equivalent to `thread::spawn()`, except by using `ProcessorSet::all()` we ensure that
// all processors are made available for these threads on Windows, even on many-processor
// systems with multiple processor groups where threads can otherwise be limited to only
// one processor group.
all_processors.spawn_thread(move |_| worker_entrypoint(rx));
}
region_cached! {
pub static FORBIDDEN_TEXTS_REGION_CACHED: Vec<String> = generate_forbidden_texts();
}
pub fn is_forbidden_text_region_cached(s: &str) -> bool {
FORBIDDEN_TEXTS_REGION_CACHED
.with_cached(|texts| texts.iter().any(|candidate| s.starts_with(candidate)))
}
let all_processors = ProcessorSet::all();
let num_workers = ProcessorSet::all().len();
const WORKER_QUEUE_SIZE: usize = 4;
let (txs, rxs) = (0..num_workers)
.map(|_| channel(WORKER_QUEUE_SIZE))
.unzip::<_, _, Vec<_>, Vec<_>>();
// Each worker thread will pop one Receiver out of this vector, so it needs to be shared.
let rxs = Arc::new(Mutex::new(rxs));
// This method will create one thread per processor and execute the callback on each of them.
// Every thread will be pinned to a specific processor, so the OS will not move them around.
all_processors.spawn_threads(move |_| {
let rx = {
let mut rxs = rxs.lock().unwrap();
rxs.pop().unwrap()
};
worker_entrypoint(rx);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment