Skip to content

Instantly share code, notes, and snippets.

@DoumanAsh
Created December 24, 2017 11:40
Show Gist options
  • Select an option

  • Save DoumanAsh/f30a5fbd5957b60e246a03e8c30595f0 to your computer and use it in GitHub Desktop.

Select an option

Save DoumanAsh/f30a5fbd5957b60e246a03e8c30595f0 to your computer and use it in GitHub Desktop.
pub type RequestType = message::request::get::Type;
enum RequestState {
None,
Sent(PendingRequest<RcSender>),
Wait(FutureResponse<RcReceiver>)
}
impl Default for RequestState {
fn default() -> Self {
RequestState::None
}
}
///Ongoing VNDB request that self recovers from connection loss.
pub struct VndbRequest {
client: Client,
request: message::Request,
state: Cell<RequestState>
}
impl Future for VndbRequest {
type Item = message::Response;
type Error = io::Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
match self.client.poll()? {
futures::Async::Ready(_) => (),
futures::Async::NotReady => return Ok(futures::Async::NotReady)
}
let state = unsafe { &*(self.client.state.as_ptr()) };
let client = match state {
&State::Connected(ref client) => client,
_ => {
warn!("Client poll returned ready, but state is still not connected!");
return Ok(futures::Async::NotReady);
}
};
loop {
let state = self.state.take();
match state {
RequestState::None => {
let send = client.send(self.request.clone());
self.state.set(RequestState::Sent(send));
//Loop again to drive IO.
},
RequestState::Sent(mut pending) => {
match pending.poll() {
Ok(futures::Async::Ready(_)) => {
debug!("VNDB request is sent");
self.state.set(RequestState::Wait(client.receive()));
//Loop again to drive IO.
},
Ok(futures::Async::NotReady) => {
self.state.set(RequestState::Sent(pending));
return Ok(futures::Async::NotReady);
},
Err(error) => {
warn!("VNDB request failed with error: {}", error);
self.state.set(RequestState::None);
self.client.state.set(State::None);
return Ok(futures::Async::NotReady)
}
}
},
RequestState::Wait(mut pending) => {
match pending.poll() {
Ok(futures::Async::Ready(result)) => match result {
(Some(rsp), _) => {
debug!("VNDB response has been received");
return Ok(futures::Async::Ready(rsp))
},
(None, _) => {
warn!("VNDB connection is unexpectedly closed. Restart request...");
self.state.set(RequestState::None);
self.client.state.set(State::None);
return Ok(futures::Async::NotReady)
}
},
Ok(futures::Async::NotReady) => {
self.state.set(RequestState::Wait(pending));
return Ok(futures::Async::NotReady);
},
Err(error) => {
warn!("Failed to get VNDB response. Error: {}", error);
self.state.set(RequestState::None);
self.client.state.set(State::None);
return Ok(futures::Async::NotReady)
}
}
}
}
}
}
}
///State of client.
enum State {
None,
Connecting(PendingConnect<RcSender, RcReceiver>),
Login(RcClient, PendingRequest<RcSender>),
WaitLogin(RcClient, FutureResponse<RcReceiver>),
Connected(RcClient)
}
impl Default for State {
fn default() -> Self {
State::None
}
}
#[derive(Clone)]
///Wrapper over VNDB client.
pub struct Client {
handle: tokio_core::reactor::Handle,
state: Rc<Cell<State>>
}
impl Client {
pub fn new(handle: tokio_core::reactor::Handle) -> io::Result<Self> {
Ok(Client {
handle: handle.clone(),
state: Rc::new(Cell::new(State::Connecting(RcClient::new(&handle)?)))
})
}
fn poll(&self) -> futures::Poll<(), io::Error> {
loop {
let state = self.state.take();
match state {
State::None => {
let new_connection = match RcClient::new(&self.handle) {
Ok(new_connection) => new_connection,
Err(error) => {
error!("Unexpected IO error on re-connecting to VNDB: {}", error);
return Err(error);
}
};
self.state.set(State::Connecting(new_connection));
return Ok(futures::Async::NotReady)
},
State::Connecting(mut pending) => match pending.poll() {
Ok(result) => match result {
futures::Async::Ready(new_client) => {
info!("Connected to VNDB");
let login = new_client.send(message::request::Login::new(None, None));
self.state.set(State::Login(new_client, login));
//Loop again to drive IO
},
futures::Async::NotReady => {
debug!("Waiting for connection VNDB");
self.state.set(State::Connecting(pending));
return Ok(futures::Async::NotReady)
}
},
Err(error) => {
error!("Connecting to VNDB failed. Error: {}", error);
return Ok(futures::Async::NotReady)
}
},
State::Login(client, mut pending_login) => match pending_login.poll()? {
futures::Async::Ready(_) => {
info!("Sent login to VNDB");
let login_rsp = client.receive();
self.state.set(State::WaitLogin(client, login_rsp));
//Loop again to drive IO
},
_ => {
debug!("Waiting send VNDB request");
self.state.set(State::Login(client, pending_login));
return Ok(futures::Async::NotReady)
}
},
State::WaitLogin(client, mut login_response) => match login_response.poll()? {
futures::Async::Ready((Some(message::Response::Ok), _)) => {
info!("Successfully connected to VNDB");
self.state.set(State::Connected(client));
return Ok(futures::Async::Ready(()))
},
futures::Async::Ready((Some(rsp), _)) => {
error!("Failed to login. Error: {:?}", rsp);
let login = client.send(message::request::Login::new(None, None));
self.state.set(State::Login(client, login));
//Loop again to drive IO
},
futures::Async::Ready((None, _)) => {
warn!("VNDB connection is unexpectedly closed. Restart connecting...");
//Loop again to drive IO
}
_ => {
debug!("Waiting for VNDB auth");
self.state.set(State::WaitLogin(client, login_response));
return Ok(futures::Async::NotReady)
}
},
State::Connected(client) => {
self.state.set(State::Connected(client));
return Ok(futures::Async::Ready(()))
}
}
}
}
pub fn get_by_id(&self, kind: message::request::get::Type, id: u64) -> VndbRequest {
let get: message::Request = message::request::Get {
kind,
flags: message::request::get::Flags::new().basic(),
filters: message::request::get::Filters::new().filter(format_args!("id = {}", id)),
options: None
}.into();
VndbRequest {
client: self.clone(),
request: get.clone(),
state: Cell::new(RequestState::None)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment