Skip to content

Instantly share code, notes, and snippets.

@shinnya
Created February 13, 2019 06:24
Show Gist options
  • Save shinnya/b8a69f124f0ec64ffae9237dbd67576f to your computer and use it in GitHub Desktop.
Save shinnya/b8a69f124f0ec64ffae9237dbd67576f to your computer and use it in GitHub Desktop.
An example of CompletableFuture with fibers-rs
use fibers::{sync::oneshot, time::timer};
use futures::{Async, Future, Poll, Select};
use futures::future::Either;
use std::marker::PhantomData;
use std::time::Duration;
// your custom error type.
use Error;
type BoxFuture<T, E> = Box<Future<Item = T, Error = E> + Send + 'static>;
/// This struct allows that one can finish a `Future` by calling `complete`.
pub struct CompletableFuture<S> {
/// A phantom type.
phantom: PhantomData<S>,
/// `Monitored` used to send a completed signal to the inner future.
monitored: Vec<oneshot::Monitored<(), Error>>,
/// A future that one would like to control.
future: Select<BoxFuture<(), Error>, BoxFuture<(), Error>>,
}
impl<S> CompletableFuture<S>
where
S: Future + Send + 'static,
S::Error: Into<Error>,
{
/// Creates a new instance.
pub fn new(src: S) -> Self {
let (monitored, monitor) = oneshot::monitor();
let future1: BoxFuture<(), Error> = Box::new(monitor.map_err(|e| track!(Error::from(e))));
let future2: BoxFuture<(), Error> = Box::new(src.map(|_| ()).map_err(|e| track!(e.into())));
Self {
phantom: PhantomData,
monitored: vec![monitored],
future: future1.select(future2)
}
}
/// Completes the inner future.
pub fn complete(&mut self) {
for monitored in self.monitored.drain(..) {
monitored.exit(Ok(()));
}
}
}
impl<S> Future for CompletableFuture<S> {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.future.poll() {
Err((e, _)) => Err(track!(e)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(((), _))) => Ok(Async::Ready(())),
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment