Created
February 13, 2019 06:24
-
-
Save shinnya/b8a69f124f0ec64ffae9237dbd67576f to your computer and use it in GitHub Desktop.
An example of CompletableFuture with fibers-rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use fibers::{sync::oneshot, time::timer}; | |
use futures::{Async, Future, Poll, Select}; | |
use futures::future::Either; | |
use std::marker::PhantomData; | |
use std::time::Duration; | |
// your custom error type. | |
use Error; | |
type BoxFuture<T, E> = Box<Future<Item = T, Error = E> + Send + 'static>; | |
/// This struct allows that one can finish a `Future` by calling `complete`. | |
pub struct CompletableFuture<S> { | |
/// A phantom type. | |
phantom: PhantomData<S>, | |
/// `Monitored` used to send a completed signal to the inner future. | |
monitored: Vec<oneshot::Monitored<(), Error>>, | |
/// A future that one would like to control. | |
future: Select<BoxFuture<(), Error>, BoxFuture<(), Error>>, | |
} | |
impl<S> CompletableFuture<S> | |
where | |
S: Future + Send + 'static, | |
S::Error: Into<Error>, | |
{ | |
/// Creates a new instance. | |
pub fn new(src: S) -> Self { | |
let (monitored, monitor) = oneshot::monitor(); | |
let future1: BoxFuture<(), Error> = Box::new(monitor.map_err(|e| track!(Error::from(e)))); | |
let future2: BoxFuture<(), Error> = Box::new(src.map(|_| ()).map_err(|e| track!(e.into()))); | |
Self { | |
phantom: PhantomData, | |
monitored: vec![monitored], | |
future: future1.select(future2) | |
} | |
} | |
/// Completes the inner future. | |
pub fn complete(&mut self) { | |
for monitored in self.monitored.drain(..) { | |
monitored.exit(Ok(())); | |
} | |
} | |
} | |
impl<S> Future for CompletableFuture<S> { | |
type Item = (); | |
type Error = Error; | |
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | |
match self.future.poll() { | |
Err((e, _)) => Err(track!(e)), | |
Ok(Async::NotReady) => Ok(Async::NotReady), | |
Ok(Async::Ready(((), _))) => Ok(Async::Ready(())), | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment