Skip to content

Instantly share code, notes, and snippets.

@edgarogh
Created July 25, 2024 00:38
Show Gist options
  • Save edgarogh/171e51b3914a039a9b7a4ec83cc38dae to your computer and use it in GitHub Desktop.
Save edgarogh/171e51b3914a039a9b7a4ec83cc38dae to your computer and use it in GitHub Desktop.
A map that allows a producer to promise a value for a given key, while a consumer can await for the completion of a given promise
use std::borrow::Borrow;
use std::hash::Hash;
use std::sync::Arc;
use dashmap::{DashMap, Entry};
use tokio::sync::oneshot;
enum Receiver<V> {
Available(oneshot::Receiver<V>),
Polling,
}
struct ReceiverHandle<'m, 'k, Q: Eq + Hash + ?Sized, K: Eq + Hash + Borrow<Q>, V> {
map: &'m DashMap<K, Receiver<V>>,
key: &'k Q,
receiver: Option<oneshot::Receiver<V>>,
}
impl<'m, 'q, Q: Eq + Hash + ?Sized, K: Eq + Hash + Borrow<Q>, V> ReceiverHandle<'m, 'q, Q, K, V> {
pub fn take(map: &'m DashMap<K, Receiver<V>>, key: &'q Q) -> Option<Self>
where
Q: Eq + Hash,
{
let mut entry = map.get_mut(key)?;
if let Receiver::Available(receiver) =
std::mem::replace(entry.value_mut(), Receiver::Polling)
{
Some(Self {
map,
key,
receiver: Some(receiver),
})
} else {
None
}
}
}
impl<'m, 'k, Q: Eq + Hash + ?Sized, K: Eq + Hash + Borrow<Q>, V> Drop
for ReceiverHandle<'m, 'k, Q, K, V>
{
fn drop(&mut self) {
let Some(mut entry) = self.map.get_mut(self.key) else {
return;
};
assert!(
matches!(entry.value(), Receiver::Polling),
"receiver was reinserted before it could be given back",
);
*entry = Receiver::Available(self.receiver.take().unwrap());
}
}
pub struct RendezvousMap<K, V> {
inner: Arc<DashMap<K, Receiver<V>>>,
}
impl<K: Eq + Hash, V> RendezvousMap<K, V> {
pub fn new() -> Self {
Self::default()
}
}
impl<K: Eq + Hash, V> Default for RendezvousMap<K, V> {
fn default() -> Self {
Self {
inner: Arc::new(DashMap::new()),
}
}
}
impl<K: 'static + Clone + Eq + Hash, V: 'static> RendezvousMap<K, V> {
pub fn promise(&self, key: K) -> Option<Promise<V>> {
match self.inner.entry(key.clone()) {
Entry::Occupied(_) => None,
Entry::Vacant(entry) => {
let (sender, receiver) = oneshot::channel();
let map = Arc::downgrade(&self.inner);
entry.insert(Receiver::Available(receiver));
let cancel = move || {
if let Some(map) = map.upgrade() {
map.remove(&key);
}
};
Some(Promise {
cancel: Some(Box::new(cancel)),
inner: Some(sender),
})
}
}
}
pub async fn wait_for<'s, 'k, Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let mut entry = ReceiverHandle::take(&self.inner, key)?;
let v = entry.receiver.as_mut().unwrap().await.ok();
if v.is_some() {
self.inner.remove(key);
}
v
}
}
pub struct Promise<V> {
cancel: Option<Box<dyn FnOnce()>>,
inner: Option<oneshot::Sender<V>>,
}
impl<V> Promise<V> {
pub fn resolve(mut self, v: V) {
let _ = self.inner.take().unwrap().send(v);
}
}
impl<V> Drop for Promise<V> {
fn drop(&mut self) {
if let Some(cancel) = self.cancel.take() {
cancel();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment