Last active
April 5, 2018 00:20
-
-
Save tobz/e18573b9faecde0f2a1dfb4e8f996624 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
error[E0271]: type mismatch resolving `<futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>> as futures::IntoFuture>::Item == ()` | |
--> src/main.rs:143:18 | |
| | |
143 | .select(close.into_future()) | |
| ^^^^^^ expected tuple, found () | |
| | |
= note: expected type `(std::option::Option<()>, rs_futures_spmc::Receiver<()>)` | |
found type `()` | |
error[E0271]: type mismatch resolving `<futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>> as futures::IntoFuture>::Error == ()` | |
--> src/main.rs:143:18 | |
| | |
143 | .select(close.into_future()) | |
| ^^^^^^ expected tuple, found () | |
| | |
= note: expected type `((), rs_futures_spmc::Receiver<()>)` | |
found type `()` | |
error[E0271]: type mismatch resolving `<futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>> as futures::Future>::Item == ()` | |
--> src/main.rs:143:18 | |
| | |
143 | .select(close.into_future()) | |
| ^^^^^^ expected tuple, found () | |
| | |
= note: expected type `(std::option::Option<()>, rs_futures_spmc::Receiver<()>)` | |
found type `()` | |
= note: required by `futures::Select` | |
error[E0271]: type mismatch resolving `<futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>> as futures::Future>::Error == ()` | |
--> src/main.rs:143:18 | |
| | |
143 | .select(close.into_future()) | |
| ^^^^^^ expected tuple, found () | |
| | |
= note: expected type `((), rs_futures_spmc::Receiver<()>)` | |
found type `()` | |
= note: required by `futures::Select` | |
error[E0599]: no method named `map` found for type `futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>>` in the current scope | |
--> src/main.rs:144:18 | |
| | |
144 | .map(|_| ()) | |
| ^^^ | |
| | |
= note: the method `map` exists but the following trait bounds were not satisfied: | |
`futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : futures::Future` | |
`&futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : slog::Drain` | |
`&mut futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : slog::Drain` | |
`&mut futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : futures::Stream` | |
`&mut futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : futures::Future` | |
`&mut futures::Select<futures::stream::ForEach<futures::stream::MapErr<tokio::net::Incoming, [closure@src/main.rs:131:26: 131:63]>, [closure@src/main.rs:132:27: 142:18], tokio::executor::Spawn>, futures::stream::StreamFuture<rs_futures_spmc::Receiver<()>>> : std::iter::Iterator` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![recursion_limit = "1024"] | |
#[macro_use] | |
extern crate error_chain; | |
#[macro_use] | |
extern crate serde_derive; | |
extern crate serde; | |
extern crate serde_json; | |
#[macro_use] | |
extern crate chan; | |
extern crate chan_signal; | |
use chan_signal::Signal; | |
extern crate tokio; | |
extern crate futures; | |
extern crate rs_futures_spmc; | |
extern crate net2; | |
use rs_futures_spmc::{channel, Receiver}; | |
use tokio::prelude::*; | |
use tokio::io::copy; | |
use tokio::reactor::Handle; | |
#[macro_use] | |
extern crate log; | |
#[macro_use(slog_o, slog_kv)] | |
extern crate slog; | |
extern crate slog_stdlog; | |
extern crate slog_scope; | |
extern crate slog_term; | |
extern crate slog_async; | |
use slog::Drain; | |
use std::thread; | |
mod config; | |
mod listener; | |
mod utils; | |
use config::Configuration; | |
use utils::stream_cancelable; | |
mod errors { | |
error_chain!{} | |
} | |
use errors::*; | |
fn main() { | |
// Due to the way signal masking apparently works, or works with this library, we | |
// must initialize our signal handling code before *any* threads are spun up by | |
// the process, otherwise we don't seem to get them delivered to us. | |
// | |
// We also have this accessory thread because trying to wrap the channel as a stream | |
// was fraught with pain and this is much simpler. C'est la vie. | |
let signals = chan_signal::notify(&[Signal::USR1, Signal::USR2]); | |
let (close_tx, close_rx) = channel::<()>(1); | |
thread::spawn(move || { | |
loop { | |
let signal = signals.recv().unwrap(); | |
debug!("[core] signal received: {:?}", signal); | |
match signal { | |
Signal::USR1 => { | |
// signal to spawn new process | |
}, | |
Signal::USR2 => { | |
// signal to close this process | |
close_tx.send(()).wait(); | |
break; | |
}, | |
_ => { | |
// we don't care about the rest | |
} | |
} | |
} | |
}); | |
// Configure our logging. This gives us fully asynchronous logging to the terminal | |
// which is also level filtered. As well, we've replaced the global std logger | |
// and pulled in helper macros that correspond to the various logging levels. | |
let decorator = slog_term::TermDecorator::new().build(); | |
let drain = slog_term::FullFormat::new(decorator).build().fuse(); | |
let drain = slog_async::Async::new(drain).build().fuse(); | |
let logger = slog::Logger::root( | |
slog::LevelFilter::new(drain, slog::Level::Debug).fuse(), | |
slog_o!("version" => env!("CARGO_PKG_VERSION"))); | |
let _scope_guard = slog_scope::set_global_logger(logger); | |
let _log_guard = slog_stdlog::init().unwrap(); | |
info!("[core] logging configured"); | |
if let Err(ref e) = run(close_rx) { | |
use std::io::Write; | |
let stderr = &mut ::std::io::stderr(); | |
let errmsg = "Error writing to stderr"; | |
writeln!(stderr, "error: {}", e).expect(errmsg); | |
for e in e.iter().skip(1) { | |
writeln!(stderr, "caused by: {}", e).expect(errmsg); | |
} | |
if let Some(backtrace) = e.backtrace() { | |
writeln!(stderr, "backtrace: {:?}", backtrace).expect(errmsg); | |
} | |
::std::process::exit(1); | |
} | |
} | |
fn run(close_rx: Receiver::<()>) -> Result<()> { | |
tokio::run(future::lazy(move || { | |
// set up our listeners | |
let configuration = Configuration::from_path("synchrotron.json") | |
.unwrap(); | |
for pool_config in configuration.pools { | |
let close = close_rx.clone(); | |
let pool_address = pool_config.pool_address.clone(); | |
let reactor = Handle::current(); | |
let listener = listener::get_listener(&pool_address, &reactor).unwrap(); | |
//let server = stream_cancelable(listener.incoming(), close) | |
let server = listener.incoming() | |
.map_err(|e| error!("accept failed = {:?}", e)) | |
.for_each(|sock| { | |
let (reader, writer) = sock.split(); | |
let bytes_copied = copy(reader, writer); | |
let handle_conn = bytes_copied.map(|amt| { | |
info!("wrote {:?} bytes", amt) | |
}).map_err(|e| { | |
error!("IO error {:?}", e) | |
}); | |
tokio::spawn(handle_conn) | |
}) | |
.select(close.into_future()) | |
.map(|_| ()) | |
.map_err(|_| ()) | |
.then(|| { | |
info!("[pool] shutting down listener"); | |
future::ok(()) | |
}); | |
tokio::spawn(server); | |
info!("[pool] listening on {}...", pool_address); | |
} | |
Ok(()) | |
})); | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment