#[derive(Clone)] pub struct KillSwitch { kill: Arc<AtomicBool>, } impl KillSwitch { pub fn new() -> Self { KillSwitch { kill: Arc::new(AtomicBool::new(false)), } } /// Kill all clones of this KillSwitch. Note that this replaces self /// with a new KillSwitch, meaning that futher calls to `clone` won't /// contain a killed connection. pub fn kill(&mut self) { self.kill.store(true, Ordering::Relaxed); self.kill = Arc::new(AtomicBool::new(false)); } pub fn is_killed(&self) -> bool { self.kill.load(Ordering::Relaxed) } } pub struct ConnWrapper { conn: Option<Connection<TcpStream, Service>>, kill: KillSwitch, timeout: Interval, kill_on_timeout: bool, timeouts: usize, } impl ConnWrapper { pub fn wrap(conn: Connection<TcpStream, Service>, kill: &KillSwitch) -> Self { ConnWrapper { conn: Some(conn), kill: kill.clone(), kill_on_timeout: false, timeout: Interval::new_interval(Duration::from_millis(1000)), timeouts: 0, } } } impl Future for ConnWrapper { type Item = (); type Error = (); fn poll(&mut self) -> Result<Async<()>, ()> { match self.timeout.poll() { Ok(Async::NotReady) => {}, Ok(Async::Ready(_)) => { if self.kill_on_timeout { self.conn = None; return Ok(Async::Ready(())); } else { task::current().notify(); self.timeouts += 1; } }, Err(err) => { error!("timer error: {}", err); self.conn = None; return Err(()); }, } if !self.kill_on_timeout { if self.timeouts > 1000 || self.kill.is_killed() { /// We should kill this connection. Start a 100 ms timeout and kill /// it if it doesn't finish before the timeout. self.timeout = Interval::new_interval(Duration::from_millis(100)); self.kill_on_timeout = true; if let Some(conn) = &mut self.conn { conn.graceful_shutdown(); } task::current().notify(); return Ok(Async::NotReady); } } match &mut self.conn { Some(conn) => match conn.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(())) => Ok(Async::Ready(())), Err(err) => { if err.is_closed() || err.is_canceled() || err.is_connect() { /// Just ignore it if the client closes the connection. Ok(Async::Ready(())) } else { if let Some(inner) = err.cause2() { if let Some(ioerr) = inner.downcast_ref::<std::io::Error>() { let kind = ioerr.kind(); if kind == ErrorKind::ConnectionReset { /// Just ignore it if the client closes the connection. return Ok(Async::Ready(())); } else if kind == ErrorKind::ConnectionAborted { /// Just ignore it if the client closes the connection. return Ok(Async::Ready(())); } else { error!("socket error: {} (kind: {:?})", err, kind); return Err(()); } } } error!("socket error: {}", err); Err(()) } }, }, None => { Ok(Async::Ready(())) }, } } }