Created
March 15, 2022 01:42
-
-
Save MathieuDuponchelle/9a15be9b1dc7c8c7a0a8fb9977e9cbf2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use async_std::task; | |
use futures::channel::mpsc; | |
use futures::prelude::*; | |
use pin_project_lite::pin_project; | |
use serde::{Deserialize, Serialize}; | |
use std::collections::VecDeque; | |
use std::pin::Pin; | |
use std::task::{Context, Poll}; | |
pin_project! { | |
#[must_use = "streams do nothing unless polled"] | |
pub struct Controller { | |
#[pin] | |
stream: Option<Box<dyn Stream<Item=(String, IncomingMessage)> + Unpin + Send>>, | |
items: VecDeque<(String, OutgoingMessage)> | |
} | |
} | |
#[derive(serde_derive::Deserialize, serde_derive::Serialize)] | |
pub enum IncomingMessage { | |
Foo, | |
} | |
#[derive(Debug, serde_derive::Serialize, serde_derive::Deserialize)] | |
pub enum OutgoingMessage { | |
Bar, | |
Baz, | |
} | |
impl Controller { | |
fn new( | |
stream: Option<Box<dyn Stream<Item = (String, IncomingMessage)> + Unpin + Send>>, | |
) -> Self { | |
Self { | |
stream, | |
items: VecDeque::new(), | |
} | |
} | |
pub fn handle( | |
self: Pin<&mut Self>, | |
peer_id: &str, | |
msg: IncomingMessage, | |
) -> VecDeque<(String, OutgoingMessage)> { | |
let mut ret = VecDeque::new(); | |
match msg { | |
IncomingMessage::Foo => { | |
ret.push_back((peer_id.to_string(), OutgoingMessage::Bar)); | |
ret.push_back((peer_id.to_string(), OutgoingMessage::Baz)); | |
} | |
} | |
ret | |
} | |
} | |
impl Stream for Controller { | |
type Item = (String, OutgoingMessage); | |
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
let this = self.as_mut().project(); | |
if let Some(stream) = this.stream.as_pin_mut() { | |
if let Some(item) = this.items.pop_front() { | |
Poll::Ready(Some(item)) | |
} else { | |
match stream.poll_next(cx) { | |
Poll::Ready(Some((peer_id, msg))) => { | |
let mut items = self.as_mut().handle(&peer_id, msg); | |
let this = self.as_mut().project(); | |
this.items.append(&mut items); | |
match this.items.pop_front() { | |
Some(msg) => Poll::Ready(Some(msg)), | |
None => Poll::Pending, | |
} | |
} | |
Poll::Ready(None) => Poll::Ready(None), | |
Poll::Pending => Poll::Pending, | |
} | |
} | |
} else { | |
Poll::Ready(None) | |
} | |
} | |
} | |
struct Server { | |
tx: Option<mpsc::Sender<(String, String)>>, | |
receive_task_handle: Option<task::JoinHandle<()>>, | |
} | |
impl Server { | |
fn spawn< | |
I: for<'a> Deserialize<'a>, | |
Factory: FnOnce(Pin<Box<dyn Stream<Item = (String, I)> + Send>>) -> St, | |
St: Stream, | |
>( | |
factory: Factory, | |
) -> Self | |
where | |
St::Item: Serialize + std::fmt::Debug, | |
St: Send + Unpin + 'static, | |
{ | |
let (tx, rx) = mpsc::channel::<(String, String)>(1000); | |
let mut handler = | |
factory(Box::pin(rx.map(|(peer_id, msg)| { | |
(peer_id, serde_json::from_str::<I>(&msg).unwrap()) | |
}))); | |
let receive_task_handle = task::spawn(async move { | |
while let Some(msg) = handler.next().await { | |
eprintln!("Got message: {:?}", msg); | |
} | |
}); | |
Self { | |
tx: Some(tx), | |
receive_task_handle: Some(receive_task_handle), | |
} | |
} | |
async fn accept(&mut self) { | |
if let Some(mut tx) = self.tx.clone() { | |
tx.send(( | |
"peer".to_string(), | |
serde_json::to_string(&MyIncomingMessage::Custom(MyIncomingMessageInner::Babar)) | |
.unwrap(), | |
)) | |
.await | |
.unwrap(); | |
} | |
} | |
} | |
impl Drop for Server { | |
fn drop(&mut self) { | |
let receive_task_handle = self.receive_task_handle.take(); | |
if let Some(mut tx) = self.tx.take() { | |
task::block_on(async move { | |
tx.close_channel(); | |
if let Some(handle) = receive_task_handle { | |
handle.await | |
} | |
}); | |
} | |
} | |
} | |
fn main() { | |
let mut server = Server::spawn(|stream| MyController::new(Box::new(stream))); | |
task::block_on(server.accept()); | |
} | |
pin_project! { | |
#[must_use = "streams do nothing unless polled"] | |
pub struct MyController { | |
#[pin] | |
stream: Box<dyn Stream<Item=(String, MyIncomingMessage)> + Unpin + Send>, | |
#[pin] | |
base: Controller, | |
items: VecDeque<(String, MyOutgoingMessage)>, | |
} | |
} | |
#[derive(serde_derive::Deserialize, serde_derive::Serialize)] | |
pub enum MyIncomingMessageInner { | |
Babar, | |
} | |
#[derive(serde_derive::Deserialize, serde_derive::Serialize)] | |
pub enum MyIncomingMessage { | |
Base(IncomingMessage), | |
Custom(MyIncomingMessageInner), | |
} | |
#[derive(serde_derive::Deserialize, serde_derive::Serialize, Debug)] | |
pub enum MyOutgoingMessageInner { | |
FooFoo, | |
} | |
#[derive(Debug, serde_derive::Serialize, serde_derive::Deserialize)] | |
pub enum MyOutgoingMessage { | |
Base(OutgoingMessage), | |
Custom(MyOutgoingMessageInner), | |
} | |
impl MyController { | |
fn new(stream: Box<dyn Stream<Item = (String, MyIncomingMessage)> + Unpin + Send>) -> Self { | |
let base = Controller::new(None); | |
Self { | |
stream, | |
base, | |
items: VecDeque::new(), | |
} | |
} | |
pub fn handle( | |
self: Pin<&mut Self>, | |
peer_id: &str, | |
msg: MyIncomingMessage, | |
) -> VecDeque<(String, MyOutgoingMessage)> { | |
let this = self.project(); | |
let mut ret = VecDeque::new(); | |
match msg { | |
MyIncomingMessage::Base(msg) => { | |
ret = this | |
.base | |
.handle(peer_id, msg) | |
.drain(..) | |
.map(|(peer_id, msg)| (peer_id, MyOutgoingMessage::Base(msg))) | |
.collect(); | |
} | |
MyIncomingMessage::Custom(msg) => match msg { | |
MyIncomingMessageInner::Babar => { | |
ret.push_back(( | |
peer_id.to_string(), | |
MyOutgoingMessage::Custom(MyOutgoingMessageInner::FooFoo), | |
)); | |
} | |
}, | |
} | |
ret | |
} | |
} | |
impl Stream for MyController { | |
type Item = (String, MyOutgoingMessage); | |
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
let mut this = self.as_mut().project(); | |
if let Some(item) = this.items.pop_front() { | |
Poll::Ready(Some(item)) | |
} else { | |
match this.stream.as_mut().poll_next(cx) { | |
Poll::Ready(Some((peer_id, msg))) => { | |
let mut items = self.as_mut().handle(&peer_id, msg); | |
let this = self.as_mut().project(); | |
this.items.append(&mut items); | |
match this.items.pop_front() { | |
Some(msg) => Poll::Ready(Some(msg)), | |
None => Poll::Pending, | |
} | |
} | |
Poll::Ready(None) => Poll::Ready(None), | |
Poll::Pending => Poll::Pending, | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment