Created
October 16, 2018 17:24
-
-
Save jonhoo/a92f9137c4371509652c9082e1c64e1b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate async_bincode; | |
#[macro_use] | |
extern crate futures; | |
extern crate serde; | |
#[macro_use] | |
extern crate serde_derive; | |
extern crate bincode; | |
extern crate tokio; | |
extern crate tower_service; | |
#[derive(Serialize, Deserialize)] | |
pub struct Request; | |
#[derive(Serialize, Deserialize)] | |
pub struct Response; | |
use futures::sync::{mpsc, oneshot}; | |
use std::io; | |
use tokio::prelude::*; | |
struct ConnectionInner { | |
reqs: mpsc::UnboundedReceiver<(Request, oneshot::Sender<Response>)>, | |
stream: async_bincode::AsyncBincodeStream< | |
tokio::net::tcp::TcpStream, | |
Response, | |
Request, | |
async_bincode::SyncDestination, | |
>, | |
current: Option<oneshot::Sender<Response>>, | |
send: Option<Request>, | |
} | |
impl Future for ConnectionInner { | |
type Item = (); | |
type Error = bincode::Error; | |
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> { | |
loop { | |
if self.current.is_none() { | |
match self.reqs.poll() { | |
Ok(Async::Ready(Some((req, reply)))) => { | |
self.current = Some(reply); | |
self.send = Some(req); | |
} | |
Ok(Async::Ready(None)) => { | |
// EOF | |
return Ok(Async::Ready(())); | |
} | |
Ok(Async::NotReady) => return Ok(Async::NotReady), | |
Err(_) => { | |
return Err(Box::new(bincode::ErrorKind::Io(io::Error::new( | |
io::ErrorKind::Interrupted, | |
"failed to receive request", | |
)))) | |
} | |
} | |
} | |
if let Some(r) = self.send.take() { | |
if let AsyncSink::NotReady(r) = self.stream.start_send(r)? { | |
self.send = Some(r); | |
return Ok(Async::NotReady); | |
} | |
} | |
try_ready!(self.stream.poll_complete()); | |
if let Some(r) = try_ready!(self.stream.poll()) { | |
// ignore send failure | |
let _ = self.current.take().unwrap().send(r); | |
} else { | |
// unexpected EOF | |
return Err(Box::new(bincode::ErrorKind::Io(io::Error::new( | |
io::ErrorKind::BrokenPipe, | |
"EOF while waiting for reply", | |
)))); | |
} | |
} | |
} | |
} | |
#[derive(Clone)] | |
pub struct Connection { | |
send: mpsc::UnboundedSender<(Request, oneshot::Sender<Response>)>, | |
} | |
impl Connection { | |
pub fn new( | |
stream: tokio::net::tcp::ConnectFuture, | |
) -> impl Future<Item = Connection, Error = ()> { | |
let (tx, rx) = mpsc::unbounded(); | |
future::lazy(move || { | |
tokio::spawn( | |
stream | |
.map_err(|e| panic!("{:?}", e)) | |
.and_then(move |s| ConnectionInner { | |
reqs: rx, | |
stream: s.into(), | |
current: None, | |
send: None, | |
}) | |
.map_err(|e| panic!("{:?}", e)), | |
); | |
Ok(Connection { send: tx }) | |
}) | |
} | |
} | |
impl tower_service::Service for Connection { | |
type Request = Request; | |
type Response = Response; | |
type Error = (); | |
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; | |
fn poll_ready(&mut self) -> Poll<(), Self::Error> { | |
Ok(Async::Ready(())) | |
} | |
fn call(&mut self, req: Self::Request) -> Self::Future { | |
let (tx, rx) = oneshot::channel(); | |
Box::new( | |
self.send | |
.unbounded_send((req, tx)) | |
.map_err(|_| ()) | |
.into_future() | |
.and_then(move |_| rx.map_err(|_| ())), | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment