Last active
August 4, 2024 15:42
-
-
Save dicej/ec066dc4553bf16f8a78446aa2d0e535 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::{ | |
future::{Future, IntoFuture}, | |
sync::oneshot, | |
}; | |
use std::{ | |
collections::VecDeque, | |
sync::{Arc, Mutex}, | |
}; | |
struct Inner<T> { | |
item: Option<T>, | |
wait_list: VecDeque<oneshot::Sender<T>>, | |
} | |
// `Resource` is like an asynchronous `Mutex` (and can probably be replaced by the futures 0.3 `Mutex` when we | |
// upgrade to that version). The `with` method can be used to acquire the wrapped resource asynchronously and pass | |
// it to a lambda which returns a future that releases the resource so it can be used by any waiting tasks. | |
// | |
// Usage: | |
// let shared_sink = resource::Resource::new(my_sink); | |
// let shared_sink_clone = shared_sink.clone(); | |
// ... | |
// let future = shared_sink.with(|sink| sink.send("hello, world!")); | |
// ... | |
// let other_future = shared_sink_clone.with(|sink| sink.send("hello from elsewhere!")); | |
#[derive(Clone)] | |
pub struct Resource<T> { | |
inner: Arc<Mutex<Inner<T>>>, | |
} | |
fn next<T>(inner: Arc<Mutex<Inner<T>>>, item: T) { | |
let mut inner = inner.lock().unwrap(); | |
if let Some(tx) = inner.wait_list.pop_front() { | |
let _ = tx.send(item); | |
} else { | |
inner.item = Some(item) | |
} | |
} | |
impl<T: Send + 'static> Resource<T> { | |
pub fn new(item: T) -> Self { | |
Resource { | |
inner: Arc::new(Mutex::new(Inner { | |
item: Some(item), | |
wait_list: VecDeque::new(), | |
})), | |
} | |
} | |
pub fn with<E, B: IntoFuture<Item = T, Error = E>, F: FnOnce(T) -> B + Sync + Send + 'static>( | |
&self, | |
fun: F, | |
) -> Box<dyn Future<Item = (), Error = E> + Send> | |
where | |
B::Future: Send + 'static, | |
{ | |
let mut inner = self.inner.lock().unwrap(); | |
if let Some(item) = inner.item.take() { | |
drop(inner); | |
let inner = self.inner.clone(); | |
Box::new(fun(item).into_future().map(|item| next(inner, item))) | |
as Box<dyn Future<Item = (), Error = E> + Send> | |
} else { | |
let (tx, rx) = oneshot::channel(); | |
inner.wait_list.push_back(tx); | |
drop(inner); | |
let inner = self.inner.clone(); | |
Box::new( | |
rx.map_err(|_| unreachable!()) | |
.and_then(move |item| fun(item).into_future().map(|item| next(inner, item))), | |
) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment