Skip to content

Instantly share code, notes, and snippets.

@jb55
Created March 25, 2025 18:30
Show Gist options
  • Save jb55/47c594baade21f4d5c65da7c860f31aa to your computer and use it in GitHub Desktop.
Save jb55/47c594baade21f4d5c65da7c860f31aa to your computer and use it in GitHub Desktop.
use std::sync::mpsc::{self, Sender};
use tokio::sync::oneshot;
use futures::Future;
// We erase the job signature to: "no arguments, no return value"
// Because the job itself is responsible for sending its typed result
type ErasedJob = Box<dyn FnOnce() + Send + 'static>;
struct JobPool {
tx: Sender<ErasedJob>,
}
impl JobPool {
fn new(num_threads: usize) -> Self {
let (tx, rx) = mpsc::channel::<ErasedJob>();
for _ in 0..num_threads {
let rx_clone = rx.clone();
std::thread::spawn(move || {
while let Ok(job) = rx_clone.recv() {
job(); // no return value here
}
});
}
Self { tx }
}
// A fully generic schedule:
// - each job can return a distinct T
// - we capture T in the oneshot
fn schedule<F, T>(&self, job: F) -> impl Future<Output = T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (tx_result, rx_result) = oneshot::channel::<T>();
// Erase the job to "no return" by capturing `tx_result`
// and sending the output from job() into it
let erased_job = Box::new(move || {
let output = job();
let _ = tx_result.send(output);
});
// Put this erased closure on the queue
self.tx.send(erased_job).unwrap();
// The future just waits on the typed rx_result
async move {
rx_result.await.unwrap_or_else(|_| {
panic!("Worker thread or channel dropped before returning the result.")
})
}
}
}
#[tokio::main]
async fn main() {
let pool = JobPool::new(2);
// Now each job can return different T
let future_str = pool.schedule(|| -> String {
"hello from string job".into()
});
let future_int = pool.schedule(|| -> u64 {
123
});
println!("(Meanwhile we can do more async work) ...");
let s = future_str.await;
let i = future_int.await;
println!("Got string: {:?}", s);
println!("Got integer: {}", i);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment