Created
March 25, 2025 18:30
-
-
Save jb55/47c594baade21f4d5c65da7c860f31aa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::sync::mpsc::{self, Sender}; | |
use tokio::sync::oneshot; | |
use futures::Future; | |
// We erase the job signature to: "no arguments, no return value" | |
// Because the job itself is responsible for sending its typed result | |
type ErasedJob = Box<dyn FnOnce() + Send + 'static>; | |
struct JobPool { | |
tx: Sender<ErasedJob>, | |
} | |
impl JobPool { | |
fn new(num_threads: usize) -> Self { | |
let (tx, rx) = mpsc::channel::<ErasedJob>(); | |
for _ in 0..num_threads { | |
let rx_clone = rx.clone(); | |
std::thread::spawn(move || { | |
while let Ok(job) = rx_clone.recv() { | |
job(); // no return value here | |
} | |
}); | |
} | |
Self { tx } | |
} | |
// A fully generic schedule: | |
// - each job can return a distinct T | |
// - we capture T in the oneshot | |
fn schedule<F, T>(&self, job: F) -> impl Future<Output = T> | |
where | |
F: FnOnce() -> T + Send + 'static, | |
T: Send + 'static, | |
{ | |
let (tx_result, rx_result) = oneshot::channel::<T>(); | |
// Erase the job to "no return" by capturing `tx_result` | |
// and sending the output from job() into it | |
let erased_job = Box::new(move || { | |
let output = job(); | |
let _ = tx_result.send(output); | |
}); | |
// Put this erased closure on the queue | |
self.tx.send(erased_job).unwrap(); | |
// The future just waits on the typed rx_result | |
async move { | |
rx_result.await.unwrap_or_else(|_| { | |
panic!("Worker thread or channel dropped before returning the result.") | |
}) | |
} | |
} | |
} | |
#[tokio::main] | |
async fn main() { | |
let pool = JobPool::new(2); | |
// Now each job can return different T | |
let future_str = pool.schedule(|| -> String { | |
"hello from string job".into() | |
}); | |
let future_int = pool.schedule(|| -> u64 { | |
123 | |
}); | |
println!("(Meanwhile we can do more async work) ..."); | |
let s = future_str.await; | |
let i = future_int.await; | |
println!("Got string: {:?}", s); | |
println!("Got integer: {}", i); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment