Skip to content

Instantly share code, notes, and snippets.

@valyagolev
Last active May 28, 2023 04:36
Show Gist options
  • Save valyagolev/9ddd2805df88e125dc95ab5d106f7cb2 to your computer and use it in GitHub Desktop.
Save valyagolev/9ddd2805df88e125dc95ab5d106f7cb2 to your computer and use it in GitHub Desktop.
use std::{mem::ManuallyDrop, time::Duration};
use dioxus::prelude::Scope;
use tokio::sync::oneshot::{self, Sender};
pub struct PeriodicUpdateSub {
sender: ManuallyDrop<Sender<()>>,
}
impl Drop for PeriodicUpdateSub {
fn drop(&mut self) {
let sender = unsafe { ManuallyDrop::take(&mut self.sender) };
sender.send(()).unwrap();
}
}
pub fn use_periodic_update(cx: Scope, interval: Duration) {
cx.use_hook(|| {
let update = cx.schedule_update();
let (sender, mut receiver) = oneshot::channel();
tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut receiver => break,
_ = tokio::time::sleep(interval) => {
update();
}
}
}
});
PeriodicUpdateSub {
sender: ManuallyDrop::new(sender),
}
});
}
pub fn use_periodic_update_future<'a, T, F>(
cx: &'a Scope,
interval: Duration,
future_fabric: impl Send + Sync + 'static + Fn() -> F,
) -> RwLockReadGuard<'a, Option<T>>
where
T: Send + Sync + 'static,
F: Send + Future<Output = T>,
{
let (value, _) = cx.use_hook(|| {
let value = Arc::new(RwLock::new(None));
let update = cx.schedule_update();
let (sender, mut receiver) = oneshot::channel();
{
let value = value.clone();
tokio::spawn(async move {
loop {
let val = Some(future_fabric().await);
*value.write().unwrap() = val;
tokio::select! {
_ = &mut receiver => break,
_ = tokio::time::sleep(interval) => {
update();
}
}
}
});
}
(
value,
PeriodicUpdateSub {
sender: ManuallyDrop::new(sender),
},
)
});
value.read().unwrap()
}
pub async fn cancellable<T>(
kill: oneshot::Receiver<()>,
task: impl Future<Output = T>,
) -> anyhow::Result<T> {
tokio::select! {
output = task => Ok(output),
_ = kill => anyhow::bail!("Task was cancelled"),
}
}
pub fn use_stream<'a, T, F, S>(
cx: &'a Scope,
max_values: usize,
stream_creator: impl Send + Sync + 'static + Fn() -> F,
) -> RwLockReadGuard<'a, Option<Vec<T>>>
where
T: Send + Sync + 'static,
F: Send + Future<Output = S>,
S: Send + Stream<Item = T>,
{
let (value, sender_max_vals, receiver_max_vals, _) = cx.use_hook(|| {
let value = Arc::new(RwLock::new(None));
let update = cx.schedule_update();
let (sender_cancel, receiver_cancel) = oneshot::channel();
let (sender_max_vals, receiver_max_vals) = watch::channel(max_values);
{
let mut receiver_max_vals = receiver_max_vals.clone();
let value = value.clone();
tokio::spawn(cancellable(receiver_cancel, async move {
let mut stream = pin!(stream_creator().await);
let mut cnt = 0;
*value.write().unwrap() = Some(Vec::new());
loop {
while cnt >= *receiver_max_vals.borrow() {
receiver_max_vals.changed().await.unwrap();
}
let Some(val) = stream.next().await else {
return;
};
let mut opt = value.write().unwrap();
let vec = opt.as_mut().unwrap();
vec.push(val);
cnt += 1;
update();
}
}));
}
(
value,
sender_max_vals,
receiver_max_vals,
SendOnDrop::new(sender_cancel),
)
});
if *receiver_max_vals.borrow() != max_values {
sender_max_vals.send(max_values).unwrap();
}
value.read().unwrap()
}
pub fn Data(cx: Scope) -> Element {
let count = use_state(cx, || 10);
let stream_files = use_stream(&cx, *count.clone(), || async move {
ReadDirStream::new(tokio::fs::read_dir("./data").await.unwrap()).map(|x| x.unwrap().path())
});
cx.render(rsx! {
button {
onclick: move |_| {
count.modify(|c| c + 100)
},
"More"
}
ul {
stream_files.as_ref().unwrap_or(&vec![]).iter().map(|x| {
rsx! {
li {
x.to_str().unwrap()
}
}
})
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment