Last active
April 15, 2020 22:13
-
-
Save jbr/5d9f9c1e628b97c9e20984ee3eb4fb4c to your computer and use it in GitHub Desktop.
hacked-up async_sse
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if nobody is listening to an event stream, it never gets encoded, | |
since encoding happens on the consumption end of the channel | |
library users can use StreamExt to subset/filter/transform events | |
before they're encoded, since user-defined events are on the channel, | |
not already-encoded text | |
channel specifics are outside of the purview of this code. as long as | |
it implements a stream of Events, it can be encoded as an SSE | |
response. if there's some sort of fancy fanout logic that isn't | |
expressed with a BroadcastChannel and StreamExt, that'll still be | |
workable | |
each event is encoded as a single contiguous u8 slice, ensuring that lines can't be | |
accidentally interleaved (seems possible with current async_sse?) | |
--- | |
Biggest disadvantage: | |
events are encoded once per listener, which is potentially expensive | |
with a lot of connections. ideally this would be cached somewhere, but | |
i'm not comfortable enough with rust data structures to know the right | |
way to do this. some sort of weak map where the value Drops when the | |
key drops? |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// this file is entirely "library user code" | |
mod sse; | |
use async_std::{ | |
stream::StreamExt, | |
sync::{Arc, Mutex}, | |
}; | |
use broadcaster::BroadcastChannel; | |
use sse::{Event, EventStream}; | |
use tide::Response; | |
use MyEvent::*; | |
#[derive(Clone, Debug)] | |
enum MyEvent { | |
A, | |
B, | |
} | |
impl Event for MyEvent { | |
fn name(&self) -> &str { | |
match self { | |
B => "A", | |
A => "B", | |
} | |
} | |
fn data(&self) -> &[u8] { | |
match self { | |
A => "hey from an A message".as_bytes(), | |
B => "hello from a B message".as_bytes(), | |
} | |
} | |
fn id(&self) -> Option<&str> { | |
None | |
} | |
} | |
type Request = tide::Request<Arc<Mutex<BroadcastChannel<MyEvent>>>>; | |
async fn send_a(request: Request) -> Response { | |
let state = request.state(); | |
state.lock().await.send(&A).await.unwrap(); | |
Response::new(200) | |
} | |
async fn send_b(request: Request) -> Response { | |
let state = request.state(); | |
state.lock().await.send(&B).await.unwrap(); | |
Response::new(200) | |
} | |
async fn stream_all(request: Request) -> Response { | |
let state = request.state(); | |
state.lock().await.clone().into_response() | |
} | |
async fn stream_only_a(request: Request) -> Response { | |
let state = request.state(); | |
let broadcast = state.lock().await.clone(); | |
broadcast.filter(|x| matches!(x, A)).into_response() | |
} | |
async fn stream_only_b(request: Request) -> Response { | |
let state = request.state(); | |
let broadcast = state.lock().await.clone(); | |
broadcast.filter(|x| matches!(x, B)).into_response() | |
} | |
#[async_std::main] | |
async fn main() { | |
let stream = BroadcastChannel::new(); | |
let mut server = tide::with_state(Arc::new(Mutex::new(stream))); | |
server.at("/a").post(send_a).get(stream_only_a); | |
server.at("/b").post(send_b).get(stream_only_b); | |
server.at("/").get(stream_all); | |
server.listen("127.0.0.1:3131").await.unwrap(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// in case anyone sees this without context, a lot of this is from | |
// https://github.com/http-rs/async-sse/blob/master/src/encoder.rs | |
use async_std::{ | |
stream::Stream, | |
io::{BufReader, Read as AsyncRead}, | |
task::{ready, Context, Poll}, | |
}; | |
use std::{io, pin::Pin}; | |
pin_project_lite::pin_project! { | |
/// An SSE protocol encoder. | |
#[derive(Debug, Clone)] | |
pub struct Encoder<S> { | |
buf: Option<Vec<u8>>, | |
#[pin] | |
receiver: S, | |
cursor: usize, | |
} | |
} | |
impl<E, S> AsyncRead for Encoder<S> | |
where | |
E: Event, | |
S: Unpin + Stream<Item = E>, | |
{ | |
fn poll_read( | |
mut self: Pin<&mut Self>, | |
cx: &mut Context<'_>, | |
buf: &mut [u8], | |
) -> Poll<io::Result<usize>> { | |
// Request a new buffer if we don't have one yet. | |
if let None = self.buf { | |
log::trace!("> waiting for event"); | |
self.buf = match ready!(Pin::new(&mut self.receiver).poll_next(cx)) { | |
Some(event) => { | |
let encoded = encode(&event); | |
log::trace!("> Received a new event with len {}", encoded.len()); | |
Some(encoded) | |
} | |
None => { | |
log::trace!("> Encoder done reading"); | |
return Poll::Ready(Ok(0)); | |
} | |
}; | |
}; | |
// Write the current buffer to completion. | |
let local_buf = self.buf.as_mut().unwrap(); | |
let local_len = local_buf.len(); | |
let max = buf.len().min(local_buf.len()); | |
buf[..max].clone_from_slice(&local_buf[..max]); | |
self.cursor += max; | |
// Reset values if we're done reading. | |
if self.cursor == local_len { | |
self.buf = None; | |
self.cursor = 0; | |
}; | |
// Return bytes read. | |
Poll::Ready(Ok(max)) | |
} | |
} | |
pub trait Event { | |
fn name(&self) -> &str; | |
fn data(&self) -> &[u8]; | |
fn id(&self) -> Option<&str>; | |
} | |
pub trait EventStream: Sized + Unpin + Send { | |
fn into_encoder(self) -> Encoder<Self>; | |
fn into_response(self) -> tide::Response; | |
} | |
fn encode<'a, E: Event>(event: &'a E) -> Vec<u8> { | |
log::trace!("> encoding event "); | |
let mut data = String::new(); | |
data.push_str(&format!("event:{}\n", event.name())); | |
if let Some(id) = event.id() { | |
data.push_str(&format!("id:{}\n", id)); | |
} | |
data.push_str("data:"); | |
let mut data = data.into_bytes(); | |
data.extend_from_slice(event.data()); | |
data.push(b'\n'); | |
data.push(b'\n'); | |
data | |
} | |
use tide::IntoResponse; | |
impl<S, E> IntoResponse for Encoder<S> | |
where | |
S: Send + Unpin + Stream<Item = E> + 'static, | |
E: Event, | |
{ | |
fn into_response(self) -> tide::Response { | |
tide::Response::with_reader(200, BufReader::new(self)) | |
.set_header("cache-control", "no-cache") | |
.set_header("content-type", "text/event-stream") | |
} | |
} | |
impl<E: Event, S: Send + Unpin + Stream<Item = E> + 'static> EventStream for S { | |
fn into_encoder(self) -> Encoder<Self> { | |
Encoder { | |
receiver: self, | |
buf: None, | |
cursor: 0, | |
} | |
} | |
fn into_response(self) -> tide::Response { | |
self.into_encoder().into_response() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment