Created
July 25, 2024 00:38
-
-
Save edgarogh/171e51b3914a039a9b7a4ec83cc38dae to your computer and use it in GitHub Desktop.
A map that allows a producer to promise a value for a given key, while a consumer can await for the completion of a given promise
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::borrow::Borrow; | |
use std::hash::Hash; | |
use std::sync::Arc; | |
use dashmap::{DashMap, Entry}; | |
use tokio::sync::oneshot; | |
enum Receiver<V> { | |
Available(oneshot::Receiver<V>), | |
Polling, | |
} | |
struct ReceiverHandle<'m, 'k, Q: Eq + Hash + ?Sized, K: Eq + Hash + Borrow<Q>, V> { | |
map: &'m DashMap<K, Receiver<V>>, | |
key: &'k Q, | |
receiver: Option<oneshot::Receiver<V>>, | |
} | |
impl<'m, 'q, Q: Eq + Hash + ?Sized, K: Eq + Hash + Borrow<Q>, V> ReceiverHandle<'m, 'q, Q, K, V> { | |
pub fn take(map: &'m DashMap<K, Receiver<V>>, key: &'q Q) -> Option<Self> | |
where | |
Q: Eq + Hash, | |
{ | |
let mut entry = map.get_mut(key)?; | |
if let Receiver::Available(receiver) = | |
std::mem::replace(entry.value_mut(), Receiver::Polling) | |
{ | |
Some(Self { | |
map, | |
key, | |
receiver: Some(receiver), | |
}) | |
} else { | |
None | |
} | |
} | |
} | |
impl<'m, 'k, Q: Eq + Hash + ?Sized, K: Eq + Hash + Borrow<Q>, V> Drop | |
for ReceiverHandle<'m, 'k, Q, K, V> | |
{ | |
fn drop(&mut self) { | |
let Some(mut entry) = self.map.get_mut(self.key) else { | |
return; | |
}; | |
assert!( | |
matches!(entry.value(), Receiver::Polling), | |
"receiver was reinserted before it could be given back", | |
); | |
*entry = Receiver::Available(self.receiver.take().unwrap()); | |
} | |
} | |
pub struct RendezvousMap<K, V> { | |
inner: Arc<DashMap<K, Receiver<V>>>, | |
} | |
impl<K: Eq + Hash, V> RendezvousMap<K, V> { | |
pub fn new() -> Self { | |
Self::default() | |
} | |
} | |
impl<K: Eq + Hash, V> Default for RendezvousMap<K, V> { | |
fn default() -> Self { | |
Self { | |
inner: Arc::new(DashMap::new()), | |
} | |
} | |
} | |
impl<K: 'static + Clone + Eq + Hash, V: 'static> RendezvousMap<K, V> { | |
pub fn promise(&self, key: K) -> Option<Promise<V>> { | |
match self.inner.entry(key.clone()) { | |
Entry::Occupied(_) => None, | |
Entry::Vacant(entry) => { | |
let (sender, receiver) = oneshot::channel(); | |
let map = Arc::downgrade(&self.inner); | |
entry.insert(Receiver::Available(receiver)); | |
let cancel = move || { | |
if let Some(map) = map.upgrade() { | |
map.remove(&key); | |
} | |
}; | |
Some(Promise { | |
cancel: Some(Box::new(cancel)), | |
inner: Some(sender), | |
}) | |
} | |
} | |
} | |
pub async fn wait_for<'s, 'k, Q>(&self, key: &Q) -> Option<V> | |
where | |
K: Borrow<Q>, | |
Q: Eq + Hash + ?Sized, | |
{ | |
let mut entry = ReceiverHandle::take(&self.inner, key)?; | |
let v = entry.receiver.as_mut().unwrap().await.ok(); | |
if v.is_some() { | |
self.inner.remove(key); | |
} | |
v | |
} | |
} | |
pub struct Promise<V> { | |
cancel: Option<Box<dyn FnOnce()>>, | |
inner: Option<oneshot::Sender<V>>, | |
} | |
impl<V> Promise<V> { | |
pub fn resolve(mut self, v: V) { | |
let _ = self.inner.take().unwrap().send(v); | |
} | |
} | |
impl<V> Drop for Promise<V> { | |
fn drop(&mut self) { | |
if let Some(cancel) = self.cancel.take() { | |
cancel(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment