Skip to content

Instantly share code, notes, and snippets.

@cspinetta
Created June 25, 2017 21:58
Show Gist options
  • Save cspinetta/266d238d605d4347e3aecf86e0e6cca2 to your computer and use it in GitHub Desktop.
Save cspinetta/266d238d605d4347e3aecf86e0e6cca2 to your computer and use it in GitHub Desktop.
currently with compile failed
[package]
name = "toy-load-balancer"
version = "0.1.0"
authors = ["Cristian Spinetta <[email protected]>"]
[dependencies]
log = "0.3"
#env_logger="0.4.3"
pretty_env_logger = "0.1"
hyper = "0.11"
# basic dependencies from echo server before
futures = "0.1"
tokio-core = "0.1"
tokio-proto = "0.1"
tokio-service = "0.1"
tokio-pool = "0.1"
# our toy HTTP implementation
tokio-minihttp = { git = "https://github.com/tokio-rs/tokio-minihttp" }
# json
serde_derive = "1.0"
serde = "1.0"
serde_json = "1.0"
# misc support for thread pools, random numbers
futures-cpupool = "0.1"
rand = "0.3"
#[macro_use] extern crate log;
extern crate pretty_env_logger;
extern crate hyper;
extern crate futures;
extern crate tokio_core;
extern crate tokio_pool;
use hyper::header::ContentLength;
use hyper::server::{Http, Request, Response, Service};
use hyper::{Method, StatusCode};
use std::ascii::AsciiExt;
use futures::{Stream, Future};
use hyper::{Body, Chunk};
use futures::future;
use futures::future::{Either, Map, FutureResult};
use futures::stream::Concat2;
use std::io::{self, Write};
use hyper::Client;
use tokio_core::reactor::Core;
use tokio_core::reactor::Handle;
use tokio_core::net::TcpListener;
use hyper::Uri;
use hyper::error::UriError;
use tokio_pool::TokioPool;
use std::sync::Arc;
use std::net::SocketAddr;
use std::cell::RefCell;
use std::mem;
use std::borrow::Borrow;
fn main() {
pretty_env_logger::init().unwrap();
let (pool, join) = TokioPool::new(4)
.expect("Failed to create event loop");
let addr: SocketAddr = "0.0.0.0:8080".parse().unwrap();
let pool: Arc<TokioPool> = Arc::new(pool);
let service: Arc<Proxy> = Arc::new(Proxy { pool_ref: pool.clone() });
let pool_ref: Arc<TokioPool> = pool.clone();
let http: Arc<Http<Chunk>> = Arc::new(Http::new());
pool.next_worker().spawn(move |handle| {
let listener = TcpListener::bind(&addr, handle).unwrap();
listener.incoming().for_each(move |(socket, addr)| {
pool_ref.next_worker().spawn(move |handle| {
http.bind_connection(&handle.clone(), socket, addr, service);
Ok(())
});
Ok(())
}).map_err(|err| {
error!("Error with TcpListener: {:?}", err);
})
});
join.join();
}
#[derive(Clone)]
struct Proxy {
pool_ref: Arc<TokioPool>
}
impl Proxy {
fn create_proxy_url(&self, host: &str, uri: Uri) -> Result<Uri, UriError> {
format!("{}{}{}", host, uri.path(), uri.query().unwrap_or("")).parse()
}
}
impl Service for Proxy {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = Box<Future<Item=Self::Response, Error = Self::Error>>;
fn call(&self, req: Self::Request) -> Self::Future {
let method = req.method().clone();
let host = "http://localhost:8000"; // other host
let uri = self.create_proxy_url(host, req.uri().clone())
.expect(&format!("Failed trying to parse uri. Origin: {:?}", &req.uri()));
let mut client_req = Request::new(method, uri);
client_req.headers_mut().extend(req.headers().iter());
client_req.set_body(req.body());
info!("Dispatching incoming connection: {:?}", client_req);
let new_handler = self.pool_ref.next_worker().handle().unwrap().clone();
let client = Client::new(&new_handler);
let resp = client.request(client_req)
.then(move |result| {
match result {
Ok(client_resp) => {
Ok(client_resp)
}
Err(e) => {
error!("{:?}", &e);
Err(e)
}
}
});
Box::new(resp)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment