Last active
May 28, 2023 04:36
-
-
Save valyagolev/9ddd2805df88e125dc95ab5d106f7cb2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{mem::ManuallyDrop, time::Duration}; | |
use dioxus::prelude::Scope; | |
use tokio::sync::oneshot::{self, Sender}; | |
pub struct PeriodicUpdateSub { | |
sender: ManuallyDrop<Sender<()>>, | |
} | |
impl Drop for PeriodicUpdateSub { | |
fn drop(&mut self) { | |
let sender = unsafe { ManuallyDrop::take(&mut self.sender) }; | |
sender.send(()).unwrap(); | |
} | |
} | |
pub fn use_periodic_update(cx: Scope, interval: Duration) { | |
cx.use_hook(|| { | |
let update = cx.schedule_update(); | |
let (sender, mut receiver) = oneshot::channel(); | |
tokio::spawn(async move { | |
loop { | |
tokio::select! { | |
_ = &mut receiver => break, | |
_ = tokio::time::sleep(interval) => { | |
update(); | |
} | |
} | |
} | |
}); | |
PeriodicUpdateSub { | |
sender: ManuallyDrop::new(sender), | |
} | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub fn use_periodic_update_future<'a, T, F>( | |
cx: &'a Scope, | |
interval: Duration, | |
future_fabric: impl Send + Sync + 'static + Fn() -> F, | |
) -> RwLockReadGuard<'a, Option<T>> | |
where | |
T: Send + Sync + 'static, | |
F: Send + Future<Output = T>, | |
{ | |
let (value, _) = cx.use_hook(|| { | |
let value = Arc::new(RwLock::new(None)); | |
let update = cx.schedule_update(); | |
let (sender, mut receiver) = oneshot::channel(); | |
{ | |
let value = value.clone(); | |
tokio::spawn(async move { | |
loop { | |
let val = Some(future_fabric().await); | |
*value.write().unwrap() = val; | |
tokio::select! { | |
_ = &mut receiver => break, | |
_ = tokio::time::sleep(interval) => { | |
update(); | |
} | |
} | |
} | |
}); | |
} | |
( | |
value, | |
PeriodicUpdateSub { | |
sender: ManuallyDrop::new(sender), | |
}, | |
) | |
}); | |
value.read().unwrap() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub async fn cancellable<T>( | |
kill: oneshot::Receiver<()>, | |
task: impl Future<Output = T>, | |
) -> anyhow::Result<T> { | |
tokio::select! { | |
output = task => Ok(output), | |
_ = kill => anyhow::bail!("Task was cancelled"), | |
} | |
} | |
pub fn use_stream<'a, T, F, S>( | |
cx: &'a Scope, | |
max_values: usize, | |
stream_creator: impl Send + Sync + 'static + Fn() -> F, | |
) -> RwLockReadGuard<'a, Option<Vec<T>>> | |
where | |
T: Send + Sync + 'static, | |
F: Send + Future<Output = S>, | |
S: Send + Stream<Item = T>, | |
{ | |
let (value, sender_max_vals, receiver_max_vals, _) = cx.use_hook(|| { | |
let value = Arc::new(RwLock::new(None)); | |
let update = cx.schedule_update(); | |
let (sender_cancel, receiver_cancel) = oneshot::channel(); | |
let (sender_max_vals, receiver_max_vals) = watch::channel(max_values); | |
{ | |
let mut receiver_max_vals = receiver_max_vals.clone(); | |
let value = value.clone(); | |
tokio::spawn(cancellable(receiver_cancel, async move { | |
let mut stream = pin!(stream_creator().await); | |
let mut cnt = 0; | |
*value.write().unwrap() = Some(Vec::new()); | |
loop { | |
while cnt >= *receiver_max_vals.borrow() { | |
receiver_max_vals.changed().await.unwrap(); | |
} | |
let Some(val) = stream.next().await else { | |
return; | |
}; | |
let mut opt = value.write().unwrap(); | |
let vec = opt.as_mut().unwrap(); | |
vec.push(val); | |
cnt += 1; | |
update(); | |
} | |
})); | |
} | |
( | |
value, | |
sender_max_vals, | |
receiver_max_vals, | |
SendOnDrop::new(sender_cancel), | |
) | |
}); | |
if *receiver_max_vals.borrow() != max_values { | |
sender_max_vals.send(max_values).unwrap(); | |
} | |
value.read().unwrap() | |
} | |
pub fn Data(cx: Scope) -> Element { | |
let count = use_state(cx, || 10); | |
let stream_files = use_stream(&cx, *count.clone(), || async move { | |
ReadDirStream::new(tokio::fs::read_dir("./data").await.unwrap()).map(|x| x.unwrap().path()) | |
}); | |
cx.render(rsx! { | |
button { | |
onclick: move |_| { | |
count.modify(|c| c + 100) | |
}, | |
"More" | |
} | |
ul { | |
stream_files.as_ref().unwrap_or(&vec![]).iter().map(|x| { | |
rsx! { | |
li { | |
x.to_str().unwrap() | |
} | |
} | |
}) | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment