Skip to content

Instantly share code, notes, and snippets.

@TheWaWaR
Last active December 22, 2016 14:21
Show Gist options
  • Save TheWaWaR/c01cb20620a5246f17451601b40683d0 to your computer and use it in GitHub Desktop.
Save TheWaWaR/c01cb20620a5246f17451601b40683d0 to your computer and use it in GitHub Desktop.
extern crate threadpool;
extern crate futures;
extern crate tokio_core;
use std::env;
use std::thread;
use std::net::SocketAddr;
use futures::Sink;
use futures::sync::mpsc::{channel, Receiver};
use futures::{Future};
use futures::stream::Stream;
use tokio_core::io::{copy, Io};
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::{Core};
fn start_loop(rx: Receiver<(TcpStream, SocketAddr)>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let done = rx.for_each(move |(socket, addr)| {
println!("[Start for]: {:?}", addr);
let (reader, writer) = socket.split();
let amt = copy(reader, writer);
let msg = amt.then(move |result| {
match result {
Ok(amt) => println!("wrote {} bytes to {}", amt, addr),
Err(e) => println!("error on {}: {}", addr, e),
}
Ok(())
});
println!("[Spawn for]: {:?}", addr);
handle.spawn(msg);
Ok(())
});
core.run(done).unwrap();
}
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let (tx, rx) = channel::<(TcpStream, SocketAddr)>(100);
thread::spawn(move || {
start_loop(rx);
});
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
let done = socket.incoming().for_each(move |(socket, addr)| {
let _ = tx.clone().send((socket, addr));
Ok(())
});
core.run(done).unwrap();
}
extern crate threadpool;
extern crate futures;
extern crate tokio_core;
use std::env;
use std::thread;
use std::net::SocketAddr;
use std::sync::mpsc::{channel, Receiver, RecvError};
use futures::{Future, Poll, Async};
use futures::stream::Stream;
use tokio_core::io::{copy, Io};
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::{Core};
struct Acceptor {
rx: Receiver<(TcpStream, SocketAddr)>,
received_once: bool
}
impl Acceptor {
fn new(rx: Receiver<(TcpStream, SocketAddr)>) -> Acceptor {
Acceptor{rx: rx, received_once: false}
}
}
impl Stream for Acceptor {
type Item = (TcpStream, SocketAddr);
type Error = RecvError;
fn poll(&mut self) -> Poll<Option<(TcpStream, SocketAddr)>, RecvError> {
if self.received_once {
return Ok(Async::NotReady);
}
let (socket, addr) = self.rx.recv().unwrap();
self.received_once = true;
Ok(Async::Ready(Some((socket, addr))))
}
}
fn start_loop(rx: Receiver<(TcpStream, SocketAddr)>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let acceptor = Acceptor::new(rx);
let done = acceptor.for_each(move |(socket, addr)| {
println!("[Start for]: {:?}", addr);
let (reader, writer) = socket.split();
let amt = copy(reader, writer);
let msg = amt.then(move |result| {
match result {
Ok(amt) => println!("wrote {} bytes to {}", amt, addr),
Err(e) => println!("error on {}: {}", addr, e),
}
Ok(())
});
println!("[Spawn for]: {:?}", addr);
handle.spawn(msg);
Ok(())
});
core.run(done).unwrap();
}
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let (tx, rx) = channel::<(TcpStream, SocketAddr)>();
thread::spawn(move || {
start_loop(rx);
});
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
let done = socket.incoming().for_each(move |(socket, addr)| {
let _ = tx.clone().send((socket, addr));
Ok(())
});
core.run(done).unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment