#[derive(Clone)]
pub struct KillSwitch {
    kill: Arc<AtomicBool>,
}
impl KillSwitch {
    pub fn new() -> Self {
        KillSwitch {
            kill: Arc::new(AtomicBool::new(false)),
        }
    }
    /// Kill all clones of this KillSwitch. Note that this replaces self
    /// with a new KillSwitch, meaning that futher calls to `clone` won't
    /// contain a killed connection.
    pub fn kill(&mut self) {
        self.kill.store(true, Ordering::Relaxed);
        self.kill = Arc::new(AtomicBool::new(false));
    }
    pub fn is_killed(&self) -> bool {
        self.kill.load(Ordering::Relaxed)
    }
}

pub struct ConnWrapper {
    conn: Option<Connection<TcpStream, Service>>,
    kill: KillSwitch,
    timeout: Interval,
    kill_on_timeout: bool,
    timeouts: usize,
}
impl ConnWrapper {
    pub fn wrap(conn: Connection<TcpStream, Service>, kill: &KillSwitch) -> Self {
        ConnWrapper {
            conn: Some(conn),
            kill: kill.clone(),
            kill_on_timeout: false,
            timeout: Interval::new_interval(Duration::from_millis(1000)),
            timeouts: 0,
        }
    }
}
impl Future for ConnWrapper {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Result<Async<()>, ()> {
        match self.timeout.poll() {
            Ok(Async::NotReady) => {},
            Ok(Async::Ready(_)) => {
                if self.kill_on_timeout {
                    self.conn = None;
                    return Ok(Async::Ready(()));
                } else {
                    task::current().notify();
                    self.timeouts += 1;
                }
            },
            Err(err) => {
                error!("timer error: {}", err);
                self.conn = None;
                return Err(());
            },
        }
        if !self.kill_on_timeout {
            if self.timeouts > 1000 || self.kill.is_killed() {
                /// We should kill this connection. Start a 100 ms timeout and kill
                /// it if it doesn't finish before the timeout.
                self.timeout = Interval::new_interval(Duration::from_millis(100));
                self.kill_on_timeout = true;
                if let Some(conn) = &mut self.conn {
                    conn.graceful_shutdown();
                }
                task::current().notify();
                return Ok(Async::NotReady);
            }
        }
        match &mut self.conn {
            Some(conn) => match conn.poll() {
                Ok(Async::NotReady) => Ok(Async::NotReady),
                Ok(Async::Ready(())) => Ok(Async::Ready(())),
                Err(err) => {
                    if err.is_closed() || err.is_canceled() || err.is_connect() {
                        /// Just ignore it if the client closes the connection.
                        Ok(Async::Ready(()))
                    } else {
                        if let Some(inner) = err.cause2() {
                            if let Some(ioerr) = inner.downcast_ref::<std::io::Error>() {
                                let kind = ioerr.kind();
                                if kind == ErrorKind::ConnectionReset {
                                    /// Just ignore it if the client closes the connection.
                                    return Ok(Async::Ready(()));
                                } else if kind == ErrorKind::ConnectionAborted {
                                    /// Just ignore it if the client closes the connection.
                                    return Ok(Async::Ready(()));
                                } else {
                                    error!("socket error: {} (kind: {:?})", err, kind);
                                    return Err(());
                                }
                            }
                        }
                        error!("socket error: {}", err);
                        Err(())
                    }
                },
            },
            None => {
                Ok(Async::Ready(()))
            },
        }
    }
}