Created
December 13, 2022 22:46
-
-
Save sowbug/93a0187a7c3038e09d73270cd97184b5 to your computer and use it in GitHub Desktop.
Try #2 on threaded subscription, this time with a view() method on the app calling into the background struct's methods to construct the view.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// See https://gist.github.com/sowbug/40148c249136a037cc3b4b814e9de129 for try #1 | |
// | |
// https://github.com/iced-rs/iced/discussions/1600 for more background | |
use iced::{ | |
executor, time, | |
widget::{button, column, container, text}, | |
window, Application, Command, Event as IcedEvent, Settings, Theme, | |
}; | |
use iced_native::futures::channel::mpsc; | |
use iced_native::subscription::{self, Subscription}; | |
use std::{ | |
cell::RefCell, | |
sync::{Arc, Mutex}, | |
thread::JoinHandle, | |
time::Instant, | |
}; | |
enum ThingSubscriptionState { | |
Start, | |
Ready(JoinHandle<()>, mpsc::Receiver<ThingEvent>), | |
Ending(JoinHandle<()>), | |
Idle, | |
} | |
#[derive(Debug)] | |
enum ThingInput { | |
Start, | |
Quit, | |
} | |
#[derive(Clone)] | |
enum ThingEvent { | |
Ready(mpsc::Sender<ThingInput>, Arc<Mutex<Thing>>), | |
ProgressReport(i32), | |
Quit, | |
} | |
impl std::fmt::Debug for ThingEvent { | |
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
match self { | |
Self::Ready(_, _) => f.debug_tuple("Ready").finish(), | |
Self::ProgressReport(i) => write!(f, "ProgressReport {}", i), | |
ThingEvent::Quit => write!(f, "Quit"), | |
} | |
} | |
} | |
#[derive(Default)] | |
struct Thing { | |
counter: i32, | |
done: bool, | |
} | |
impl Thing { | |
pub fn work(&mut self) { | |
// This sleep represents a bunch of CPU-intensive work that isn't | |
// amenable to .await. On my Ryzen 4700U running Linux, any sleep() ends | |
// up limiting the work() cycle to about 16,000Hz, or about 60 | |
// microseconds/cycle, which isn't totally surprising because Tokio | |
// (which we're not using) says that its version of sleep "operates at | |
// millisecond granularity and should not be used for tasks that require | |
// high-resolution timers. The implementation is platform specific, and | |
// some platforms (specifically Windows) will provide timers with a | |
// larger resolution than 1 ms." All of which suggests that | |
// std::thread::sleep() would also have granularity limits coarser than | |
// what this API promises. | |
// | |
// If we comment out this sleep(), we go up just a bit from 16KHz to | |
// 2.8MHz. This is useful to prove the point that sleep() has an | |
// artificial lower bound on time, and that the flow in this prototype | |
// is likely to be sufficiently efficient for our needs. | |
std::thread::sleep(time::Duration::from_nanos(1)); | |
self.counter += 1; | |
} | |
pub fn update(&mut self, input: ThingInput) -> Option<ThingEvent> { | |
match input { | |
ThingInput::Start => { | |
println!("Thing Received ThingInput::Start"); | |
self.counter = 0; | |
None | |
} | |
ThingInput::Quit => { | |
println!("Thing Received ThingInput::Quit"); | |
self.done = true; | |
Some(ThingEvent::Quit) | |
} | |
} | |
} | |
pub fn done(&self) -> bool { | |
self.done | |
} | |
fn x(&self) -> i32 { | |
self.counter | |
} | |
fn subscription() -> Subscription<ThingEvent> { | |
subscription::unfold( | |
std::any::TypeId::of::<Thing>(), | |
ThingSubscriptionState::Start, | |
|state| async move { | |
match state { | |
ThingSubscriptionState::Start => { | |
// This channel lets the app send messages to the closure. | |
let (sender, mut receiver) = mpsc::channel::<ThingInput>(1024); | |
// This channel surfaces event messages from Thing as subscription events. | |
let (mut output_sender, output_receiver) = | |
mpsc::channel::<ThingEvent>(1024); | |
let thing = Arc::new(Mutex::new(Thing::default())); | |
let thing_ext = Arc::clone(&thing); | |
let handler = std::thread::spawn(move || loop { | |
if let Ok(mut thing) = thing.lock() { | |
thing.work(); | |
} | |
if let Ok(Some(input)) = receiver.try_next() { | |
println!("Subscription got message {:?}", input); | |
let event = if let Ok(mut thing) = thing.lock() { | |
thing.update(input) | |
} else { | |
None | |
}; | |
if let Some(event) = event { | |
let _ = output_sender.try_send(event); | |
} | |
} | |
let (x, done) = if let Ok(thing) = thing.lock() { | |
(thing.x(), thing.done()) | |
} else { | |
(0, true) | |
}; | |
let _ = output_sender.try_send(ThingEvent::ProgressReport(x)); | |
if done { | |
println!("Subscription exiting loop"); | |
break; | |
} | |
}); | |
( | |
Some(ThingEvent::Ready(sender, thing_ext)), | |
ThingSubscriptionState::Ready(handler, output_receiver), | |
) | |
} | |
ThingSubscriptionState::Ready(handler, mut output_receiver) => { | |
use iced_native::futures::StreamExt; | |
let event = output_receiver.select_next_some().await; | |
let mut done = false; | |
match event { | |
ThingEvent::Ready(_, _) => {} | |
ThingEvent::ProgressReport(_) => {} | |
ThingEvent::Quit => { | |
println!("Subscription peeked at ThingEvent::Quit"); | |
done = true; | |
} | |
} | |
if done { | |
println!("Subscription is forwarding ThingEvent::Quit to app"); | |
(Some(event), ThingSubscriptionState::Ending(handler)) | |
} else { | |
( | |
Some(event), | |
ThingSubscriptionState::Ready(handler, output_receiver), | |
) | |
} | |
} | |
ThingSubscriptionState::Ending(handler) => { | |
println!("Subscription ThingState::Ending"); | |
if let Ok(_) = handler.join() { | |
println!("Subscription handler.join()"); | |
} | |
// See https://github.com/iced-rs/iced/issues/1348 | |
return (None, ThingSubscriptionState::Idle); | |
} | |
ThingSubscriptionState::Idle => { | |
println!("Subscription ThingState::Idle"); | |
// I took this line from | |
// https://github.com/iced-rs/iced/issues/336, but I | |
// don't understand why it helps. I think it's necessary | |
// for the system to get a chance to process all the | |
// subscription results. | |
let _: () = iced::futures::future::pending().await; | |
(None, ThingSubscriptionState::Idle) | |
} | |
} | |
}, | |
) | |
} | |
} | |
// Loader is a stub to show how the app bootstraps itself from nothing to having | |
// whatever persistent state it needs to run. This example would be sufficient | |
// without it. | |
struct Loader {} | |
impl Loader { | |
async fn load() -> bool { | |
true | |
} | |
} | |
#[derive(Clone, Debug)] | |
enum AppMessage { | |
Loaded(bool), | |
StartButtonPressed, | |
StopButtonPressed, | |
ThingEvent(ThingEvent), | |
Event(IcedEvent), | |
} | |
struct MyApp { | |
should_exit: bool, | |
sender: Option<mpsc::Sender<ThingInput>>, | |
done_with_thing: bool, | |
thing: Arc<Mutex<Thing>>, | |
last_view_time: RefCell<f32>, | |
last_thing_checkpoint: RefCell<Instant>, | |
last_thing_checkpoint_value: RefCell<i32>, | |
last_thing_count_per_second: RefCell<i32>, | |
} | |
impl Default for MyApp { | |
fn default() -> Self { | |
Self { | |
should_exit: Default::default(), | |
sender: Default::default(), | |
done_with_thing: Default::default(), | |
thing: Default::default(), | |
last_view_time: Default::default(), | |
last_thing_checkpoint: RefCell::new(Instant::now()), | |
last_thing_checkpoint_value: Default::default(), | |
last_thing_count_per_second: Default::default(), | |
} | |
} | |
} | |
impl Application for MyApp { | |
type Message = AppMessage; | |
type Theme = Theme; | |
type Executor = executor::Default; | |
type Flags = (); | |
fn new(_flags: Self::Flags) -> (Self, iced::Command<Self::Message>) { | |
( | |
Self::default(), | |
Command::perform(Loader::load(), AppMessage::Loaded), | |
) | |
} | |
fn title(&self) -> String { | |
"App Sandbox".to_string() | |
} | |
fn update(&mut self, message: Self::Message) -> iced::Command<Self::Message> { | |
match message { | |
AppMessage::Loaded(load_success) => { | |
if load_success { | |
*self = MyApp::default(); | |
} else { | |
todo!() | |
} | |
Command::none() | |
} | |
AppMessage::StartButtonPressed => { | |
if let Some(sender) = &mut self.sender { | |
let _ = sender.try_send(ThingInput::Start); | |
} | |
Command::none() | |
} | |
AppMessage::StopButtonPressed => { | |
if let Some(sender) = &mut self.sender { | |
let _ = sender.try_send(ThingInput::Quit); | |
} | |
Command::none() | |
} | |
AppMessage::ThingEvent(message) => { | |
match message { | |
ThingEvent::Ready(mut sender, thing) => { | |
self.thing = thing; | |
let _ = sender.try_send(ThingInput::Start); | |
self.sender = Some(sender); | |
} | |
ThingEvent::ProgressReport(_) => { | |
// This message could be a way for all the view's state | |
// to be delivered to the app, rather than locking down | |
// Thing with a mutex. Thing would probably want to | |
// rate-limit these updates to be closer to the intended | |
// app GUI update rate, rather than updating each time | |
// in its own much faster work loop. Or we could look at | |
// these updates as a stream or transaction log, | |
// allowing the app to sync all the state it needs to | |
// render a competent view. Depends on the use case. | |
} | |
ThingEvent::Quit => { | |
println!("App received ThingEvent::Quit"); | |
self.done_with_thing = true; | |
} | |
} | |
Command::none() | |
} | |
AppMessage::Event(event) => { | |
if let IcedEvent::Window(window::Event::CloseRequested) = event { | |
println!("App got window::Event::CloseRequested"); | |
if let Some(sender) = &mut self.sender { | |
let _ = sender.try_send(ThingInput::Quit); | |
} | |
self.should_exit = true; | |
} | |
Command::none() | |
} | |
} | |
} | |
fn view(&self) -> iced::Element<'_, Self::Message, iced::Renderer<Self::Theme>> { | |
let start = Instant::now(); | |
let thing_view: iced::Element<Self::Message> = if let Ok(thing) = self.thing.lock() { | |
container(text(format!("Progress: {}", thing.x()).to_string())).into() | |
} else { | |
container(text("oops".to_string())).into() | |
}; | |
let profiling_view = container(text(format!( | |
"last view time: {} msec; thing count/second: {}", | |
self.last_view_time.borrow(), | |
self.last_thing_count_per_second.borrow() | |
))); | |
let content = column![ | |
thing_view, | |
button("Start").on_press(AppMessage::StartButtonPressed), | |
button("Stop").on_press(AppMessage::StopButtonPressed), | |
profiling_view, | |
]; | |
let finish = Instant::now(); | |
*self.last_view_time.borrow_mut() = | |
finish.duration_since(start).as_micros() as f32 / 1000.0; | |
if finish | |
.duration_since(*self.last_thing_checkpoint.borrow()) | |
.as_secs() | |
>= 1 | |
{ | |
*self.last_thing_checkpoint.borrow_mut() = finish; | |
if let Ok(thing) = self.thing.lock() { | |
let new_x = thing.x(); | |
*self.last_thing_count_per_second.borrow_mut() = | |
new_x - *self.last_thing_checkpoint_value.borrow(); | |
*self.last_thing_checkpoint_value.borrow_mut() = new_x; | |
} | |
} | |
container(content).into() | |
} | |
fn subscription(&self) -> iced::Subscription<Self::Message> { | |
if self.done_with_thing { | |
iced_native::subscription::events().map(AppMessage::Event) | |
} else { | |
Subscription::batch([ | |
Thing::subscription().map(AppMessage::ThingEvent), | |
iced_native::subscription::events().map(AppMessage::Event), | |
]) | |
} | |
} | |
fn should_exit(&self) -> bool { | |
// We need to override this to ask the Thing thread to quit when the | |
// user asks to close the app. Otherwise the app will linger in a zombie | |
// state after the main window goes away. | |
self.should_exit | |
} | |
} | |
pub fn main() -> iced::Result { | |
let need_close_requested = true; | |
let settings = Settings { | |
exit_on_close_request: !need_close_requested, | |
..Settings::default() | |
}; | |
MyApp::run(settings) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment