Skip to content

Instantly share code, notes, and snippets.

@tobz
Created October 4, 2018 17:07
Show Gist options
  • Save tobz/81e4301e1f2b8bcc1935b7ca767f9c9c to your computer and use it in GitHub Desktop.
Save tobz/81e4301e1f2b8bcc1935b7ca767f9c9c to your computer and use it in GitHub Desktop.
diff --git a/src/backend/backend.rs b/src/backend/backend.rs
index fc64df4..c2fbe40 100644
--- a/src/backend/backend.rs
+++ b/src/backend/backend.rs
@@ -25,12 +25,34 @@ use futures::{
future::{ok, Either, Shared},
prelude::*,
sync::mpsc,
+ Poll,
};
use futures_turnstyle::Waiter;
use protocol::errors::ProtocolError;
-use std::{collections::HashMap, net::SocketAddr, str::FromStr, sync::Arc};
-use tokio::net::TcpStream;
+use std::{collections::HashMap, net::SocketAddr, str::FromStr, sync::Arc, time::Duration};
+use tokio::net::tcp::TcpStream;
use util::{WorkQueue, Worker};
+use tokio::timer::Timeout;
+use tokio::timer::timeout::Error as TimeoutError;
+
+type MaybeTimeout<F> = Either<NotTimeout<F>, Timeout<F>>;
+
+pub struct NotTimeout<F>
+ where F: Future
+{
+ inner: F,
+}
+
+impl<F> Future for NotTimeout<F>
+ where F: Future
+{
+ type Error = TimeoutError<F::Error>;
+ type Item = F::Item;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ self.inner.poll().map_err(|e| TimeoutError::inner(e))
+ }
+}
/// Commands sent by backend connections to their backend supervisor.
pub enum BackendCommand {
@@ -58,9 +80,10 @@ where
worker: Worker<Vec<QueuedMessage<P::Message>>>,
command_tx: mpsc::UnboundedSender<BackendCommand>,
address: SocketAddr,
+ timeout_ms: u64,
socket: Option<TcpStream>,
- current: Option<P::Future>,
+ current: Option<MaybeTimeout<P::Future>>,
}
impl<P> Future for BackendConnection<P>
@@ -86,7 +109,8 @@ where
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => {
// On error, we kill ourselves but notify the supervisor first so it can
- // replace us down the line.
+ // replace us down the line. This includes both errors with the underlying
+ // call itself or timing out during the call.
let _ = self.command_tx.unbounded_send(BackendCommand::Error);
return Err(());
},
@@ -102,7 +126,12 @@ where
None => Either::B(TcpStream::connect(&self.address)),
};
- let work = self.processor.process(batch, socket);
+ let inner = self.processor.process(batch, socket);
+ let work = if self.timeout_ms == 0 {
+ Either::A(NotTimeout { inner })
+ } else {
+ Either::B(Timeout::new(inner, Duration::from_millis(self.timeout_ms)))
+ };
self.current = Some(work);
},
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment