Created
October 4, 2018 17:07
-
-
Save tobz/81e4301e1f2b8bcc1935b7ca767f9c9c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/backend/backend.rs b/src/backend/backend.rs | |
index fc64df4..c2fbe40 100644 | |
--- a/src/backend/backend.rs | |
+++ b/src/backend/backend.rs | |
@@ -25,12 +25,34 @@ use futures::{ | |
future::{ok, Either, Shared}, | |
prelude::*, | |
sync::mpsc, | |
+ Poll, | |
}; | |
use futures_turnstyle::Waiter; | |
use protocol::errors::ProtocolError; | |
-use std::{collections::HashMap, net::SocketAddr, str::FromStr, sync::Arc}; | |
-use tokio::net::TcpStream; | |
+use std::{collections::HashMap, net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; | |
+use tokio::net::tcp::TcpStream; | |
use util::{WorkQueue, Worker}; | |
+use tokio::timer::Timeout; | |
+use tokio::timer::timeout::Error as TimeoutError; | |
+ | |
+type MaybeTimeout<F> = Either<NotTimeout<F>, Timeout<F>>; | |
+ | |
+pub struct NotTimeout<F> | |
+ where F: Future | |
+{ | |
+ inner: F, | |
+} | |
+ | |
+impl<F> Future for NotTimeout<F> | |
+ where F: Future | |
+{ | |
+ type Error = TimeoutError<F::Error>; | |
+ type Item = F::Item; | |
+ | |
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | |
+ self.inner.poll().map_err(|e| TimeoutError::inner(e)) | |
+ } | |
+} | |
/// Commands sent by backend connections to their backend supervisor. | |
pub enum BackendCommand { | |
@@ -58,9 +80,10 @@ where | |
worker: Worker<Vec<QueuedMessage<P::Message>>>, | |
command_tx: mpsc::UnboundedSender<BackendCommand>, | |
address: SocketAddr, | |
+ timeout_ms: u64, | |
socket: Option<TcpStream>, | |
- current: Option<P::Future>, | |
+ current: Option<MaybeTimeout<P::Future>>, | |
} | |
impl<P> Future for BackendConnection<P> | |
@@ -86,7 +109,8 @@ where | |
Ok(Async::NotReady) => return Ok(Async::NotReady), | |
Err(_) => { | |
// On error, we kill ourselves but notify the supervisor first so it can | |
- // replace us down the line. | |
+ // replace us down the line. This includes both errors with the underlying | |
+ // call itself or timing out during the call. | |
let _ = self.command_tx.unbounded_send(BackendCommand::Error); | |
return Err(()); | |
}, | |
@@ -102,7 +126,12 @@ where | |
None => Either::B(TcpStream::connect(&self.address)), | |
}; | |
- let work = self.processor.process(batch, socket); | |
+ let inner = self.processor.process(batch, socket); | |
+ let work = if self.timeout_ms == 0 { | |
+ Either::A(NotTimeout { inner }) | |
+ } else { | |
+ Either::B(Timeout::new(inner, Duration::from_millis(self.timeout_ms))) | |
+ }; | |
self.current = Some(work); | |
}, | |
Ok(Async::Ready(None)) => return Ok(Async::Ready(())), |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment