Created
December 24, 2017 11:40
-
-
Save DoumanAsh/f30a5fbd5957b60e246a03e8c30595f0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| pub type RequestType = message::request::get::Type; | |
| enum RequestState { | |
| None, | |
| Sent(PendingRequest<RcSender>), | |
| Wait(FutureResponse<RcReceiver>) | |
| } | |
| impl Default for RequestState { | |
| fn default() -> Self { | |
| RequestState::None | |
| } | |
| } | |
| ///Ongoing VNDB request that self recovers from connection loss. | |
| pub struct VndbRequest { | |
| client: Client, | |
| request: message::Request, | |
| state: Cell<RequestState> | |
| } | |
| impl Future for VndbRequest { | |
| type Item = message::Response; | |
| type Error = io::Error; | |
| fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { | |
| match self.client.poll()? { | |
| futures::Async::Ready(_) => (), | |
| futures::Async::NotReady => return Ok(futures::Async::NotReady) | |
| } | |
| let state = unsafe { &*(self.client.state.as_ptr()) }; | |
| let client = match state { | |
| &State::Connected(ref client) => client, | |
| _ => { | |
| warn!("Client poll returned ready, but state is still not connected!"); | |
| return Ok(futures::Async::NotReady); | |
| } | |
| }; | |
| loop { | |
| let state = self.state.take(); | |
| match state { | |
| RequestState::None => { | |
| let send = client.send(self.request.clone()); | |
| self.state.set(RequestState::Sent(send)); | |
| //Loop again to drive IO. | |
| }, | |
| RequestState::Sent(mut pending) => { | |
| match pending.poll() { | |
| Ok(futures::Async::Ready(_)) => { | |
| debug!("VNDB request is sent"); | |
| self.state.set(RequestState::Wait(client.receive())); | |
| //Loop again to drive IO. | |
| }, | |
| Ok(futures::Async::NotReady) => { | |
| self.state.set(RequestState::Sent(pending)); | |
| return Ok(futures::Async::NotReady); | |
| }, | |
| Err(error) => { | |
| warn!("VNDB request failed with error: {}", error); | |
| self.state.set(RequestState::None); | |
| self.client.state.set(State::None); | |
| return Ok(futures::Async::NotReady) | |
| } | |
| } | |
| }, | |
| RequestState::Wait(mut pending) => { | |
| match pending.poll() { | |
| Ok(futures::Async::Ready(result)) => match result { | |
| (Some(rsp), _) => { | |
| debug!("VNDB response has been received"); | |
| return Ok(futures::Async::Ready(rsp)) | |
| }, | |
| (None, _) => { | |
| warn!("VNDB connection is unexpectedly closed. Restart request..."); | |
| self.state.set(RequestState::None); | |
| self.client.state.set(State::None); | |
| return Ok(futures::Async::NotReady) | |
| } | |
| }, | |
| Ok(futures::Async::NotReady) => { | |
| self.state.set(RequestState::Wait(pending)); | |
| return Ok(futures::Async::NotReady); | |
| }, | |
| Err(error) => { | |
| warn!("Failed to get VNDB response. Error: {}", error); | |
| self.state.set(RequestState::None); | |
| self.client.state.set(State::None); | |
| return Ok(futures::Async::NotReady) | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| ///State of client. | |
| enum State { | |
| None, | |
| Connecting(PendingConnect<RcSender, RcReceiver>), | |
| Login(RcClient, PendingRequest<RcSender>), | |
| WaitLogin(RcClient, FutureResponse<RcReceiver>), | |
| Connected(RcClient) | |
| } | |
| impl Default for State { | |
| fn default() -> Self { | |
| State::None | |
| } | |
| } | |
| #[derive(Clone)] | |
| ///Wrapper over VNDB client. | |
| pub struct Client { | |
| handle: tokio_core::reactor::Handle, | |
| state: Rc<Cell<State>> | |
| } | |
| impl Client { | |
| pub fn new(handle: tokio_core::reactor::Handle) -> io::Result<Self> { | |
| Ok(Client { | |
| handle: handle.clone(), | |
| state: Rc::new(Cell::new(State::Connecting(RcClient::new(&handle)?))) | |
| }) | |
| } | |
| fn poll(&self) -> futures::Poll<(), io::Error> { | |
| loop { | |
| let state = self.state.take(); | |
| match state { | |
| State::None => { | |
| let new_connection = match RcClient::new(&self.handle) { | |
| Ok(new_connection) => new_connection, | |
| Err(error) => { | |
| error!("Unexpected IO error on re-connecting to VNDB: {}", error); | |
| return Err(error); | |
| } | |
| }; | |
| self.state.set(State::Connecting(new_connection)); | |
| return Ok(futures::Async::NotReady) | |
| }, | |
| State::Connecting(mut pending) => match pending.poll() { | |
| Ok(result) => match result { | |
| futures::Async::Ready(new_client) => { | |
| info!("Connected to VNDB"); | |
| let login = new_client.send(message::request::Login::new(None, None)); | |
| self.state.set(State::Login(new_client, login)); | |
| //Loop again to drive IO | |
| }, | |
| futures::Async::NotReady => { | |
| debug!("Waiting for connection VNDB"); | |
| self.state.set(State::Connecting(pending)); | |
| return Ok(futures::Async::NotReady) | |
| } | |
| }, | |
| Err(error) => { | |
| error!("Connecting to VNDB failed. Error: {}", error); | |
| return Ok(futures::Async::NotReady) | |
| } | |
| }, | |
| State::Login(client, mut pending_login) => match pending_login.poll()? { | |
| futures::Async::Ready(_) => { | |
| info!("Sent login to VNDB"); | |
| let login_rsp = client.receive(); | |
| self.state.set(State::WaitLogin(client, login_rsp)); | |
| //Loop again to drive IO | |
| }, | |
| _ => { | |
| debug!("Waiting send VNDB request"); | |
| self.state.set(State::Login(client, pending_login)); | |
| return Ok(futures::Async::NotReady) | |
| } | |
| }, | |
| State::WaitLogin(client, mut login_response) => match login_response.poll()? { | |
| futures::Async::Ready((Some(message::Response::Ok), _)) => { | |
| info!("Successfully connected to VNDB"); | |
| self.state.set(State::Connected(client)); | |
| return Ok(futures::Async::Ready(())) | |
| }, | |
| futures::Async::Ready((Some(rsp), _)) => { | |
| error!("Failed to login. Error: {:?}", rsp); | |
| let login = client.send(message::request::Login::new(None, None)); | |
| self.state.set(State::Login(client, login)); | |
| //Loop again to drive IO | |
| }, | |
| futures::Async::Ready((None, _)) => { | |
| warn!("VNDB connection is unexpectedly closed. Restart connecting..."); | |
| //Loop again to drive IO | |
| } | |
| _ => { | |
| debug!("Waiting for VNDB auth"); | |
| self.state.set(State::WaitLogin(client, login_response)); | |
| return Ok(futures::Async::NotReady) | |
| } | |
| }, | |
| State::Connected(client) => { | |
| self.state.set(State::Connected(client)); | |
| return Ok(futures::Async::Ready(())) | |
| } | |
| } | |
| } | |
| } | |
| pub fn get_by_id(&self, kind: message::request::get::Type, id: u64) -> VndbRequest { | |
| let get: message::Request = message::request::Get { | |
| kind, | |
| flags: message::request::get::Flags::new().basic(), | |
| filters: message::request::get::Filters::new().filter(format_args!("id = {}", id)), | |
| options: None | |
| }.into(); | |
| VndbRequest { | |
| client: self.clone(), | |
| request: get.clone(), | |
| state: Cell::new(RequestState::None) | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment