Skip to content

Instantly share code, notes, and snippets.

@tobz
Last active April 5, 2018 00:20
Show Gist options
  • Save tobz/e18573b9faecde0f2a1dfb4e8f996624 to your computer and use it in GitHub Desktop.
Save tobz/e18573b9faecde0f2a1dfb4e8f996624 to your computer and use it in GitHub Desktop.
error[E0271]: type mismatch resolving `<futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>> as futures::IntoFuture>::Item == ()`
--> src/main.rs:143:18
|
143 | .select(close.into_future())
| ^^^^^^ expected tuple, found ()
|
= note: expected type `(std::option::Option<()>, rs_futures_spmc::Receiver<()>)`
found type `()`
error[E0271]: type mismatch resolving `<futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>> as futures::IntoFuture>::Error == ()`
--> src/main.rs:143:18
|
143 | .select(close.into_future())
| ^^^^^^ expected tuple, found ()
|
= note: expected type `((), rs_futures_spmc::Receiver<()>)`
found type `()`
error[E0271]: type mismatch resolving `<futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>> as futures::Future>::Item == ()`
--> src/main.rs:143:18
|
143 | .select(close.into_future())
| ^^^^^^ expected tuple, found ()
|
= note: expected type `(std::option::Option<()>, rs_futures_spmc::Receiver<()>)`
found type `()`
= note: required by `futures::Select`
error[E0271]: type mismatch resolving `<futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>> as futures::Future>::Error == ()`
--> src/main.rs:143:18
|
143 | .select(close.into_future())
| ^^^^^^ expected tuple, found ()
|
= note: expected type `((), rs_futures_spmc::Receiver<()>)`
found type `()`
= note: required by `futures::Select`
error[E0599]: no method named `map` found for type `futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>>` in the current scope
--> src/main.rs:144:18
|
144 | .map(|_| ())
| ^^^
|
= note: the method `map` exists but the following trait bounds were not satisfied:
`futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : futures::Future`
`&futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : slog::Drain`
`&mut futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : slog::Drain`
`&mut futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : futures::Stream`
`&mut futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : futures::Future`
`&mut futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : std::iter::Iterator`
#![recursion_limit = "1024"]
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate serde_derive;
extern crate serde;
extern crate serde_json;
#[macro_use]
extern crate chan;
extern crate chan_signal;
use chan_signal::Signal;
extern crate tokio;
extern crate futures;
extern crate rs_futures_spmc;
extern crate net2;
use rs_futures_spmc::{channel, Receiver};
use tokio::prelude::*;
use tokio::io::copy;
use tokio::reactor::Handle;
#[macro_use]
extern crate log;
#[macro_use(slog_o, slog_kv)]
extern crate slog;
extern crate slog_stdlog;
extern crate slog_scope;
extern crate slog_term;
extern crate slog_async;
use slog::Drain;
use std::thread;
mod config;
mod listener;
mod utils;
use config::Configuration;
use utils::stream_cancelable;
mod errors {
error_chain!{}
}
use errors::*;
fn main() {
// Due to the way signal masking apparently works, or works with this library, we
// must initialize our signal handling code before *any* threads are spun up by
// the process, otherwise we don't seem to get them delivered to us.
//
// We also have this accessory thread because trying to wrap the channel as a stream
// was fraught with pain and this is much simpler. C'est la vie.
let signals = chan_signal::notify(&[Signal::USR1, Signal::USR2]);
let (close_tx, close_rx) = channel::<()>(1);
thread::spawn(move || {
loop {
let signal = signals.recv().unwrap();
debug!("[core] signal received: {:?}", signal);
match signal {
Signal::USR1 => {
// signal to spawn new process
},
Signal::USR2 => {
// signal to close this process
close_tx.send(()).wait();
break;
},
_ => {
// we don't care about the rest
}
}
}
});
// Configure our logging. This gives us fully asynchronous logging to the terminal
// which is also level filtered. As well, we've replaced the global std logger
// and pulled in helper macros that correspond to the various logging levels.
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let logger = slog::Logger::root(
slog::LevelFilter::new(drain, slog::Level::Debug).fuse(),
slog_o!("version" => env!("CARGO_PKG_VERSION")));
let _scope_guard = slog_scope::set_global_logger(logger);
let _log_guard = slog_stdlog::init().unwrap();
info!("[core] logging configured");
if let Err(ref e) = run(close_rx) {
use std::io::Write;
let stderr = &mut ::std::io::stderr();
let errmsg = "Error writing to stderr";
writeln!(stderr, "error: {}", e).expect(errmsg);
for e in e.iter().skip(1) {
writeln!(stderr, "caused by: {}", e).expect(errmsg);
}
if let Some(backtrace) = e.backtrace() {
writeln!(stderr, "backtrace: {:?}", backtrace).expect(errmsg);
}
::std::process::exit(1);
}
}
fn run(close_rx: Receiver::<()>) -> Result<()> {
tokio::run(future::lazy(move || {
// set up our listeners
let configuration = Configuration::from_path("synchrotron.json")
.unwrap();
for pool_config in configuration.pools {
let close = close_rx.clone();
let pool_address = pool_config.pool_address.clone();
let reactor = Handle::current();
let listener = listener::get_listener(&pool_address, &reactor).unwrap();
//let server = stream_cancelable(listener.incoming(), close)
let server = listener.incoming()
.map_err(|e| error!("accept failed = {:?}", e))
.for_each(|sock| {
let (reader, writer) = sock.split();
let bytes_copied = copy(reader, writer);
let handle_conn = bytes_copied.map(|amt| {
info!("wrote {:?} bytes", amt)
}).map_err(|e| {
error!("IO error {:?}", e)
});
tokio::spawn(handle_conn)
})
.select(close.into_future())
.map(|_| ())
.map_err(|_| ())
.then(|| {
info!("[pool] shutting down listener");
future::ok(())
});
tokio::spawn(server);
info!("[pool] listening on {}...", pool_address);
}
Ok(())
}));
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment