Skip to content

Instantly share code, notes, and snippets.

@silvadanilo
Last active February 18, 2017 15:51
Show Gist options
  • Save silvadanilo/3b38d701d5ea5314720c68e2b4450f41 to your computer and use it in GitHub Desktop.
Save silvadanilo/3b38d701d5ea5314720c68e2b4450f41 to your computer and use it in GitHub Desktop.
sink implementation sample
extern crate futures;
extern crate tokio_core;
use futures::{Async, AsyncSink, StartSend, Poll, Sink, IntoFuture, Future, future};
use futures::sync::mpsc::{SendError};
use futures::Async::{NotReady, Ready};
use futures::sync::mpsc::UnboundedReceiver;
use futures::sync::mpsc;
use futures::task::{self, Task};
use tokio_core::reactor::{Core, Timeout};
use std::time::Duration;
#[derive(Clone)]
struct Buffer {
remote_server_is_ready: bool,
remote_tx: Option<mpsc::Sender<String>>,
task: Option<futures::task::Task>,
}
impl Buffer {
fn ready(&mut self) {
self.remote_server_is_ready = true;
println!("READY");
if let Some(ref t) = self.task {
t.unpark();
}
}
}
impl Sink for Buffer {
type SinkItem = String;
type SinkError = SendError<()>;
fn start_send(&mut self, msg: String) -> StartSend<String, SendError<()>> {
if !self.remote_server_is_ready {
self.task = Some(task::park());
println!("NOT READY");
return Ok(AsyncSink::NotReady(msg));
}
println!("{}", msg);
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), SendError<()>> {
println!("complete");
Ok(Async::Ready(()))
}
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let mut buffer = Buffer {
remote_server_is_ready: false,
remote_tx: None,
task: None,
};
let buffer_cloned = buffer.clone();
let t = Timeout::new(Duration::new(5, 0), &handle).into_future().flatten();
let f = t.and_then(move |_| {
buffer.ready();
Ok(())
});
let f = f.map_err(|_| panic!());
handle.spawn(f);
let s = buffer_cloned.send("prova".to_string());
core.run(s).unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment