Created
March 31, 2025 08:18
-
-
Save sandersaares/04af4e5a3081ed78bece093c551c0994 to your computer and use it in GitHub Desktop.
Structural changes for improved throughput
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub fn is_forbidden_text_static(s: &str) -> bool { | |
FORBIDDEN_TEXTS | |
.iter() | |
.any(|candidate| s.starts_with(candidate)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub static FORBIDDEN_TEXTS: LazyLock<Vec<String>> = LazyLock::new(generate_forbidden_texts); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import http from 'k6/http'; | |
export default function () { | |
let params = { | |
timeout: "10000ms", | |
}; | |
let payload = '.. elided for snippet brevity ..'; | |
http.post('http://10.0.0.8:1234/check', payload, params); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[tokio::main] | |
async fn main() { | |
let app = Router::new().route("/check", post(check)); | |
let addr = SocketAddr::from(([0, 0, 0, 0], 1234)); | |
println!("Server running on http://{}", addr); | |
let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); | |
axum::serve(listener, app.into_make_service()) | |
.await | |
.unwrap(); | |
} | |
async fn check(body: String) -> impl IntoResponse { | |
if is_forbidden_text_static(&body) { | |
(StatusCode::OK, "true") | |
} else { | |
(StatusCode::OK, "false") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sync fn main() { | |
let num_workers = num_cpus::get(); | |
let mut work_txs = Vec::with_capacity(num_workers); | |
for _ in 0..num_workers { | |
const WORKER_QUEUE_SIZE: usize = 4; | |
let (tx, rx) = channel(WORKER_QUEUE_SIZE); | |
work_txs.push(tx); | |
thread::spawn(move || worker_entrypoint(rx)); | |
} | |
listener_entrypoint(addr, work_txs).await; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fn worker_entrypoint(mut rx: Receiver<TcpStream>) { | |
let runtime = tokio::runtime::Builder::new_current_thread() | |
.enable_all() | |
.build() | |
.unwrap(); | |
runtime.block_on(async move { | |
// We build a new Axum app on every worker, ensuring that workers are independent. | |
let app = Router::new().route("/check", post(check)); | |
let service_factory = app.into_make_service_with_connect_info::<SocketAddr>(); | |
while let Some(stream) = rx.recv().await { | |
let peer_addr = stream.peer_addr().unwrap(); | |
// For each connection, we spawn a new task to handle it. | |
tokio::spawn({ | |
let mut service_factory = service_factory.clone(); | |
async move { | |
let service = service_factory.call(peer_addr).await.unwrap(); | |
let hyper_service = TowerToHyperService::new(service); | |
let http = hyper::server::conn::http1::Builder::new(); | |
// We do not care if the request handling succeeds or fails, so ignore result. | |
_ = http | |
.serve_connection(TokioIo::new(stream), hyper_service) | |
.await; | |
} | |
}); | |
} | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let all_processors = ProcessorSet::all(); | |
let num_workers = all_processors.len(); | |
let mut work_txs = Vec::with_capacity(num_workers); | |
for _ in 0..num_workers { | |
const WORKER_QUEUE_SIZE: usize = 4; | |
let (tx, rx) = channel(WORKER_QUEUE_SIZE); | |
work_txs.push(tx); | |
// In each loop iteration, we spawn a new worker thread that the OS is allowed to assign | |
// to any of the processors in the set to balance load among them. This is almost entirely | |
// equivalent to `thread::spawn()`, except by using `ProcessorSet::all()` we ensure that | |
// all processors are made available for these threads on Windows, even on many-processor | |
// systems with multiple processor groups where threads can otherwise be limited to only | |
// one processor group. | |
all_processors.spawn_thread(move |_| worker_entrypoint(rx)); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
region_cached! { | |
pub static FORBIDDEN_TEXTS_REGION_CACHED: Vec<String> = generate_forbidden_texts(); | |
} | |
pub fn is_forbidden_text_region_cached(s: &str) -> bool { | |
FORBIDDEN_TEXTS_REGION_CACHED | |
.with_cached(|texts| texts.iter().any(|candidate| s.starts_with(candidate))) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let all_processors = ProcessorSet::all(); | |
let num_workers = ProcessorSet::all().len(); | |
const WORKER_QUEUE_SIZE: usize = 4; | |
let (txs, rxs) = (0..num_workers) | |
.map(|_| channel(WORKER_QUEUE_SIZE)) | |
.unzip::<_, _, Vec<_>, Vec<_>>(); | |
// Each worker thread will pop one Receiver out of this vector, so it needs to be shared. | |
let rxs = Arc::new(Mutex::new(rxs)); | |
// This method will create one thread per processor and execute the callback on each of them. | |
// Every thread will be pinned to a specific processor, so the OS will not move them around. | |
all_processors.spawn_threads(move |_| { | |
let rx = { | |
let mut rxs = rxs.lock().unwrap(); | |
rxs.pop().unwrap() | |
}; | |
worker_entrypoint(rx); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment