Last active
September 16, 2021 06:17
-
-
Save huntc/ca11f34e15b5c7a06873a44064e2b8e1 to your computer and use it in GitHub Desktop.
Yew based EventSource agent
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! An EventSource is an agent that can be subscribed and, upon server sent events | |
//! being received, the subscribers are notified. | |
use std::collections::HashSet; | |
use serde::de::DeserializeOwned; | |
use wasm_bindgen::{prelude::*, JsCast}; | |
use web_sys::MessageEvent; | |
use yew::worker::{Agent, AgentLink, Context, HandlerId}; | |
pub enum Msg<T> { | |
Open, | |
Error, | |
Event(String, T), | |
} | |
pub enum Request { | |
Connect { | |
event_types: Vec<String>, | |
uri: String, | |
}, | |
} | |
#[derive(Clone, Debug)] | |
pub enum Response<T> | |
where | |
T: Clone, | |
{ | |
Connected, | |
Event(String, T), | |
FailedConnection, | |
Reconnecting, | |
} | |
struct JsEventSource { | |
event_source: web_sys::EventSource, | |
_on_error: Closure<dyn Fn(JsValue)>, | |
_on_event: Closure<dyn Fn(JsValue)>, | |
_on_open: Closure<dyn Fn(JsValue)>, | |
} | |
impl Drop for JsEventSource { | |
fn drop(&mut self) { | |
self.event_source.close(); | |
} | |
} | |
pub struct EventSource<T> | |
where | |
T: 'static + DeserializeOwned + Clone, | |
{ | |
current_event_source: Option<JsEventSource>, | |
link: AgentLink<EventSource<T>>, | |
subscribers: HashSet<HandlerId>, | |
} | |
impl<T> Agent for EventSource<T> | |
where | |
T: 'static + DeserializeOwned + Clone, | |
{ | |
type Reach = Context<Self>; | |
type Message = Msg<T>; | |
type Input = Request; | |
type Output = Response<T>; | |
fn create(link: AgentLink<Self>) -> Self { | |
Self { | |
current_event_source: None, | |
link, | |
subscribers: HashSet::new(), | |
} | |
} | |
fn update(&mut self, msg: Self::Message) { | |
let notification = match msg { | |
Msg::Event(e, d) => Response::Event(e, d), | |
Msg::Error => { | |
if let Some(JsEventSource { event_source, .. }) = &self.current_event_source { | |
if event_source.ready_state() == web_sys::EventSource::CONNECTING { | |
Response::Reconnecting | |
} else { | |
Response::FailedConnection | |
} | |
} else { | |
self.current_event_source = None; | |
Response::FailedConnection | |
} | |
} | |
Msg::Open => Response::Connected, | |
}; | |
for sub in self.subscribers.iter() { | |
self.link.respond(*sub, notification.to_owned()); | |
} | |
} | |
fn handle_input(&mut self, msg: Self::Input, _id: HandlerId) { | |
self.current_event_source = match msg { | |
Request::Connect { event_types, uri } => match web_sys::EventSource::new(&uri) { | |
Ok(event_source) => { | |
let on_open_link = self.link.clone(); | |
let on_open = Closure::wrap(Box::new(move |_| { | |
on_open_link.send_message(Msg::Open); | |
}) as Box<dyn Fn(JsValue)>); | |
event_source.set_onopen(Some(on_open.as_ref().unchecked_ref())); | |
let on_error_link = self.link.clone(); | |
let on_error = Closure::wrap(Box::new(move |_| { | |
on_error_link.send_message(Msg::Error); | |
}) as Box<dyn Fn(JsValue)>); | |
event_source.set_onerror(Some(on_error.as_ref().unchecked_ref())); | |
let on_message_link = self.link.clone(); | |
let on_message = Closure::wrap(Box::new(move |v: JsValue| { | |
if let Some((event_type, data)) = v | |
.dyn_into::<MessageEvent>() | |
.ok() | |
.map(|me| me.data().as_string().map(|d| (me.type_(), d))) | |
.flatten() | |
.map(|(et, d)| serde_json::from_str::<T>(&d).ok().map(|d| (et, d))) | |
.flatten() | |
{ | |
on_message_link.send_message(Msg::Event(event_type, data)) | |
} | |
}) as Box<dyn Fn(JsValue)>); | |
for event_type in event_types { | |
event_source | |
.add_event_listener_with_callback( | |
&event_type, | |
on_message.as_ref().unchecked_ref(), | |
) | |
.unwrap(); | |
} | |
Some(JsEventSource { | |
event_source, | |
_on_error: on_error, | |
_on_event: on_message, | |
_on_open: on_open, | |
}) | |
} | |
Err(_) => None, | |
}, | |
} | |
} | |
fn connected(&mut self, id: HandlerId) { | |
self.subscribers.insert(id); | |
} | |
fn disconnected(&mut self, id: HandlerId) { | |
self.subscribers.remove(&id); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment