-
-
Save seanmonstar/08b4192ff69120d8042914e66d0c0f9f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![deny(missing_docs)] | |
#![deny(warnings)] | |
//! A simple HTTP server based on a REST API. | |
//! | |
//! You can perform the following actions: | |
//! | |
//! * `GET: /[key]`: returns the value corresponding to [key]. | |
//! * `POST: /[key]`: create a new entry or update it with the given value passed in the body. | |
//! * `DELETE: /[key]`: remove the corresponding [key] from the server. | |
extern crate crc; | |
extern crate futures; | |
extern crate hyper; | |
#[cfg(target_os = "linux")] | |
extern crate libc; | |
extern crate net2; | |
extern crate num_cpus; | |
extern crate reader_writer; | |
extern crate tokio_core; | |
use crc::{crc32, Hasher32}; | |
use futures::{task, Async, Future, Poll, Stream}; | |
use futures::future::BoxFuture; | |
use futures::sync::mpsc; | |
use hyper::{Delete, Get, Put, StatusCode}; | |
use hyper::header::ContentLength; | |
use hyper::server::{Http, Service, Request, Response}; | |
use tokio_core::reactor::Core; | |
use tokio_core::net::{TcpListener, TcpStream}; | |
use std::env; | |
use std::thread; | |
use std::path::Path; | |
use std::net::SocketAddr; | |
use std::io::{Read, Write}; | |
use std::fs::File; | |
use std::os::unix::io::{IntoRawFd, FromRawFd}; | |
fn splitter(s: &str) -> Vec<&str> { | |
s.split('/').filter(|x| !x.is_empty()).collect() | |
} | |
#[derive(Clone)] | |
struct AlphaBravo { | |
rw: reader_writer::ReaderWriter, | |
} | |
impl AlphaBravo { | |
pub fn new<P: AsRef<Path>>(path: &P) -> AlphaBravo { | |
AlphaBravo { | |
rw: reader_writer::ReaderWriter::new(path).expect("ReaderWriter::new failed"), | |
} | |
} | |
} | |
impl Service for AlphaBravo { | |
type Request = Request; | |
type Response = Response; | |
type Error = hyper::Error; | |
type Future = BoxFuture<Self::Response, Self::Error>; | |
fn call(&self, req: Request) -> Self::Future { | |
let path = req.path().to_owned(); | |
futures::future::ok(match *req.method() { | |
Get => { | |
let values = splitter(&path); | |
if values.len() < 2 || !self.rw.exists(&values[values.len() - 2]) { | |
Response::new().with_status(StatusCode::NotFound) | |
} else if let Some(mut file) = self.rw.get_file(&values[values.len() - 2], false) { | |
let mut out = Vec::new(); | |
if file.read_to_end(&mut out).is_ok() { | |
Response::new().with_body(out) | |
} else { | |
Response::new().with_status(StatusCode::InternalServerError) | |
} | |
} else { | |
Response::new().with_status(StatusCode::NotFound) | |
} | |
} | |
Put => { | |
let values = splitter(&path); | |
if values.len() < 2 { | |
Response::new().with_status(StatusCode::NoContent) | |
} else if let Some(file) = self.rw.get_file(&"null"/*&values[values.len() - 2]*/, true) { | |
match req.headers().get::<ContentLength>() { | |
Some(&ContentLength(len)) => { | |
if len < 1 { | |
Response::new().with_status(StatusCode::NotModified) | |
} else { | |
let digest = crc32::Digest::new(crc32::IEEE) | |
return req.body().fold((file, digent), move |(mut file, mut digest), chunk| { | |
digest.write(&*chunk); | |
if file.write(&*chunk).is_ok() { | |
Ok((file, digest)) | |
} else { | |
file.into_raw_fd(); | |
Err(hyper::Error::Status) | |
} | |
}).then(move |r| { | |
match r { | |
Ok((_file, digest) => { | |
digest.sum32(); | |
/*if file.sync_data().is_ok() { | |
*/Ok(Response::new().with_status(StatusCode::Ok)) | |
/*} else { | |
Ok(Response::new().with_status(StatusCode::InternalServerError)) | |
}*/ | |
} | |
Err(_) => Ok(Response::new().with_status( | |
StatusCode::InsufficientStorage)), | |
} | |
}).boxed(); | |
} | |
} | |
None => Response::new().with_status(StatusCode::NotModified), | |
} | |
} else { | |
Response::new().with_status(StatusCode::InternalServerError) | |
} | |
} | |
Delete => { | |
let values = splitter(&path); | |
if values.len() >= 2 && self.rw.exists(&values[values.len() - 2]) { | |
match self.rw.remove(&values[values.len() - 2]) { | |
Ok(_) => Response::new().with_status(StatusCode::Ok), | |
Err(_) => Response::new().with_status(StatusCode::InternalServerError), | |
} | |
} else { | |
Response::new().with_status(StatusCode::NotFound) | |
} | |
} | |
_ => { | |
Response::new().with_status(StatusCode::NotFound) | |
} | |
}).boxed() | |
} | |
} | |
type ThreadData = mpsc::UnboundedSender<(TcpStream, SocketAddr)>; | |
fn make_reader_threads<P: AsRef<Path> + Send + Sync>(path: P, | |
total: usize) -> Vec<ThreadData> { | |
let mut thread_data = Vec::new(); | |
for _ in 0..total { | |
let (tx, rx) = mpsc::unbounded(); | |
thread_data.push(tx); | |
let rpath = path.as_ref(); | |
let ppath = rpath.to_path_buf(); | |
thread::spawn(move || { | |
let mut core = Core::new().unwrap(); | |
let handle = core.handle(); | |
let mut protocol = Http::new(); | |
protocol.keep_alive(false); | |
core.run(rx.for_each(move |(socket, addr)| { | |
protocol.bind_connection(&handle, socket, addr, AlphaBravo::new(&ppath)); | |
}).unwrap(); | |
}); | |
} | |
thread_data | |
} | |
fn serve<P: AsRef<Path> + Send + Sync>(addr: &SocketAddr, path: P, total: usize) { | |
let thread_data = make_reader_threads(path, total); | |
let mut core = Core::new().unwrap(); | |
let handle = core.handle(); | |
let listener = net2::TcpBuilder::new_v4().unwrap() | |
.bind(addr).unwrap() | |
.listen(1000).unwrap(); | |
let listener = TcpListener::from_listener(listener, addr, &handle).unwrap(); | |
let mut counter = 0; | |
core.run(listener.incoming().for_each(move |(socket, addr)| { | |
thread_data[counter].send((socket, addr)).unwrap(); | |
counter += 1; | |
if counter >= total { | |
counter = 0; | |
} | |
Ok(()) | |
})).unwrap(); | |
} | |
fn start_server<P: AsRef<Path> + Send + Sync>(nb_instances: usize, addr: &str, path: P) { | |
println!("===> Starting server on {} threads", nb_instances); | |
let addr = addr.parse().unwrap(); | |
println!("=> Server listening on \"{}\"", addr); | |
println!("=> Saving files in \"{}\"", path.as_ref().display()); | |
serve(&addr, path, nb_instances); | |
} | |
fn main() { | |
let num_cpus = num_cpus::get(); | |
let num_cpus = if num_cpus > 4 { num_cpus - (num_cpus / 10) * 2 } else { num_cpus }; | |
if let Some(path) = env::args().nth(1) { | |
start_server(num_cpus, "127.0.0.1:8080", path); | |
} else { | |
println!("No path received as argument so files will be stored in \"/tmp/data-rs/\"."); | |
println!("If you want to specify a path, just start like this: \"./http-server [PATH]\""); | |
println!(""); | |
start_server(num_cpus, "127.0.0.1:8080", "/tmp/data-rs/"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment