Created
March 16, 2022 14:16
-
-
Save MathieuDuponchelle/046ac18340bfb0d23fe548d543951e8b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use async_std::task; | |
use futures::channel::mpsc; | |
use futures::prelude::*; | |
use futures::ready; | |
use pin_project_lite::pin_project; | |
use serde::{Deserialize, Serialize}; | |
use std::collections::VecDeque; | |
use std::pin::Pin; | |
use std::task::{Context, Poll}; | |
use std::collections::HashMap; | |
pin_project! { | |
#[must_use = "streams do nothing unless polled"] | |
pub struct Controller { | |
#[pin] | |
stream: Option<Box<dyn Stream<Item=(String, IncomingMessage)> + Unpin + Send>>, | |
items: VecDeque<(String, OutgoingMessage)>, | |
producers: HashMap<String, String>, | |
} | |
} | |
#[derive(serde_derive::Deserialize, serde_derive::Serialize)] | |
pub enum IncomingMessage { | |
Foo, | |
} | |
#[derive(Debug, serde_derive::Serialize, serde_derive::Deserialize)] | |
pub enum OutgoingMessage { | |
Bar, | |
Baz, | |
} | |
impl Controller { | |
fn new( | |
stream: Option<Box<dyn Stream<Item = (String, IncomingMessage)> + Unpin + Send>>, | |
) -> Self { | |
Self { | |
stream, | |
items: VecDeque::new(), | |
producers: HashMap::new(), | |
} | |
} | |
pub fn handle( | |
mut self: Pin<&mut Self>, | |
peer_id: &str, | |
msg: IncomingMessage, | |
) -> VecDeque<(String, OutgoingMessage)> { | |
let mut ret = VecDeque::new(); | |
let this = self.as_mut().project(); | |
this.producers.insert("peer".to_string(), "peer".to_string()); | |
match msg { | |
IncomingMessage::Foo => { | |
ret.push_back((peer_id.to_string(), OutgoingMessage::Bar)); | |
ret.push_back((peer_id.to_string(), OutgoingMessage::Baz)); | |
} | |
} | |
ret | |
} | |
} | |
impl Stream for Controller { | |
type Item = (String, OutgoingMessage); | |
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
loop { | |
let this = self.as_mut().project(); | |
if let Some(item) = this.items.pop_front() { | |
break Poll::Ready(Some(item)); | |
} | |
if let Some(stream) = this.stream.as_pin_mut() { | |
match ready!(stream.poll_next(cx)) { | |
Some((peer_id, msg)) => { | |
let items = self.as_mut().handle(&peer_id, msg); | |
let this = self.as_mut().project(); | |
let _ = std::mem::replace(this.items, items); | |
} | |
None => { | |
break Poll::Ready(None); | |
} | |
} | |
} else { | |
break Poll::Ready(None); | |
} | |
} | |
} | |
} | |
struct Server { | |
tx: Option<mpsc::Sender<(String, String)>>, | |
receive_task_handle: Option<task::JoinHandle<()>>, | |
} | |
impl Server { | |
fn spawn< | |
I: for<'a> Deserialize<'a>, | |
Factory: FnOnce(Pin<Box<dyn Stream<Item = (String, I)> + Send>>) -> St, | |
St: Stream, | |
>( | |
factory: Factory, | |
) -> Self | |
where | |
St::Item: Serialize + std::fmt::Debug, | |
St: Send + Unpin + 'static, | |
{ | |
let (tx, rx) = mpsc::channel::<(String, String)>(1000); | |
let mut handler = | |
factory(Box::pin(rx.map(|(peer_id, msg)| { | |
(peer_id, serde_json::from_str::<I>(&msg).unwrap()) | |
}))); | |
let receive_task_handle = task::spawn(async move { | |
while let Some(msg) = handler.next().await { | |
eprintln!("Got message: {:?}", msg); | |
} | |
}); | |
Self { | |
tx: Some(tx), | |
receive_task_handle: Some(receive_task_handle), | |
} | |
} | |
async fn accept(&mut self) { | |
if let Some(mut tx) = self.tx.clone() { | |
tx.send(( | |
"peer".to_string(), | |
serde_json::to_string(&MyIncomingMessage::Base(IncomingMessage::Foo)) | |
.unwrap(), | |
)) | |
.await | |
.unwrap(); | |
} | |
} | |
} | |
impl Drop for Server { | |
fn drop(&mut self) { | |
let receive_task_handle = self.receive_task_handle.take(); | |
if let Some(mut tx) = self.tx.take() { | |
task::block_on(async move { | |
tx.close_channel(); | |
if let Some(handle) = receive_task_handle { | |
handle.await | |
} | |
}); | |
} | |
} | |
} | |
fn main() { | |
let mut server = Server::spawn(|stream| MyController::new(Some(Box::new(stream)))); | |
task::block_on(server.accept()); | |
} | |
pin_project! { | |
#[must_use = "streams do nothing unless polled"] | |
pub struct MyController { | |
#[pin] | |
stream: Option<Box<dyn Stream<Item=(String, MyIncomingMessage)> + Unpin + Send>>, | |
#[pin] | |
base: Controller, | |
items: VecDeque<(String, MyOutgoingMessage)>, | |
} | |
} | |
#[derive(serde_derive::Deserialize, serde_derive::Serialize)] | |
pub enum MyIncomingMessageInner { | |
Babar, | |
} | |
#[derive(serde_derive::Deserialize, serde_derive::Serialize)] | |
pub enum MyIncomingMessage { | |
Base(IncomingMessage), | |
Custom(MyIncomingMessageInner), | |
} | |
#[derive(serde_derive::Deserialize, serde_derive::Serialize, Debug)] | |
pub enum MyOutgoingMessageInner { | |
FooFoo, | |
} | |
#[derive(Debug, serde_derive::Serialize, serde_derive::Deserialize)] | |
pub enum MyOutgoingMessage { | |
Base(OutgoingMessage), | |
Custom(MyOutgoingMessageInner), | |
} | |
impl MyController { | |
fn new(stream: Option<Box<dyn Stream<Item = (String, MyIncomingMessage)> + Unpin + Send>>) -> Self { | |
let base = Controller::new(None); | |
Self { | |
stream, | |
base, | |
items: VecDeque::new(), | |
} | |
} | |
pub fn handle( | |
self: Pin<&mut Self>, | |
peer_id: &str, | |
msg: MyIncomingMessage, | |
) -> VecDeque<(String, MyOutgoingMessage)> { | |
let this = self.project(); | |
let mut ret = VecDeque::new(); | |
match msg { | |
MyIncomingMessage::Base(msg) => { | |
ret = this | |
.base | |
.handle(peer_id, msg) | |
.drain(..) | |
.map(|(peer_id, msg)| (peer_id, MyOutgoingMessage::Base(msg))) | |
.collect(); | |
} | |
MyIncomingMessage::Custom(msg) => match msg { | |
MyIncomingMessageInner::Babar => { | |
ret.push_back(( | |
peer_id.to_string(), | |
MyOutgoingMessage::Custom(MyOutgoingMessageInner::FooFoo), | |
)); | |
} | |
}, | |
} | |
ret | |
} | |
} | |
impl Stream for MyController { | |
type Item = (String, MyOutgoingMessage); | |
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
loop { | |
let this = self.as_mut().project(); | |
if let Some(item) = this.items.pop_front() { | |
break Poll::Ready(Some(item)); | |
} | |
if let Some(stream) = this.stream.as_pin_mut() { | |
match ready!(stream.poll_next(cx)) { | |
Some((peer_id, msg)) => { | |
let items = self.as_mut().handle(&peer_id, msg); | |
let this = self.as_mut().project(); | |
let _ = std::mem::replace(this.items, items); | |
} | |
None => { | |
break Poll::Ready(None); | |
} | |
} | |
} else { | |
break Poll::Ready(None); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment