Execute cargo new --bin httpdl
Cargo.toml
[package]
name = "httpdl"
version = "0.1.0"
authors = ["Igor Baidiuk <[email protected]>"]
[dependencies]
main.rs
fn main() {
println!("Hello, world!");
}
Cargo.toml
[package]
name = "httpdl"
version = "0.1.0"
authors = ["Igor Baidiuk <[email protected]>"]
description = "Simplistic CLI batch file downloader over HTTP protocol"
[dependencies]
Execute cargo run
Add CLAP
[dependencies]
clap = "*"
Add crate to main
#[macro_use]
extern crate clap;
fn main() {
println!("Hello, world!");
}
Compile, watch progress
Add Args struct
fn main() {
println!("Hello, world!");
}
#[derive(Debug)]
struct Args {
dest_dir: String,
list_file: String,
threads_num: usize,
speed_limit: usize,
}
Add parse_args
fn parse_args() -> Args {
use clap::Arg;
let args = app_from_crate!()
.arg(Arg::with_name("dest_dir")
.help("Directory where to store downloaded files")
.short("o")
.takes_value(true)
.required(true)
)
.arg(Arg::with_name("list_file")
.help("File which contains list of all URLs to download and local names for them")
.short("f")
.takes_value(true)
.required(true)
)
.arg(Arg::with_name("threads_num")
.help("Number of threads to use")
.short("n")
.default_value("1")
)
.arg(Arg::with_name("speed_limit")
.help("Limit speed to N bytes per second; '0' means no limit
Suffixes supported:
k, K - kilobytes (1024 bytes)
m, M - megabytes (1024*1024 bytes)
")
.short("l")
.default_value("0")
)
.get_matches();
Args {
dest_dir: args.value_of("dest_dir").unwrap().to_owned(),
list_file: args.value_of("list_file").unwrap().to_owned(),
threads_num: args.value_of("threads_num").unwrap().parse().unwrap(),
speed_limit: args.value_of("speed_limit").unwrap().parse().unwrap(),
}
}
Integrate into main
fn main() {
let args = parse_args();
println!("Arguments: {:?}", args);
}
cargo run -- -o dest_dir
cargo run -- -o dest_dir -f files.lst
cargo run -- -o dest_dir -f files.llst -n 9
cargo run -- -o dest_dir -f files.llst -n 9 -l 15k
Intermezzo: Rust error haindling
Cargo.toml: add error-chain
[dependencies]
clap = "*"
error-chain = "*"
Start with minimal common Error
#[macro_use]
extern crate clap;
#[macro_use]
extern crate error_chain;
mod errors {
error_chain! {
}
}
use errors::*;
Move to quick_main
quick_main!(run);
fn main()
fn run() -> Result<()> {
let args = parse_args()?;
println!("Arguments: {:?}", args);
Ok(())
}
parse_args to Result
fn parse_args() -> Result<Args> {
use clap::Arg;
And final return
Ok(Args {
dest_dir: args.value_of("dest_dir").unwrap().to_owned(),
list_file: args.value_of("list_file").unwrap().to_owned(),
threads_num: args.value_of("threads_num").unwrap().parse().unwrap(),
speed_limit: args.value_of("speed_limit").unwrap().parse().unwrap(),
})
cargo run -- -o dest_dir -f files.llst -n 9 -l 15k
Switch args to safe return
)
.get_matches_safe()?;
Add clap's error to foreign links
mod errors {
error_chain! {
foreign_links {
Args(::clap::Error);
}
}
}
Switch parsers to ?
threads_num: args.value_of("threads_num").unwrap().parse()?,
speed_limit: args.value_of("speed_limit").unwrap().parse()?,
Add parse int error link
foreign_links {
Args(::clap::Error);
ParseInt(::std::num::ParseIntError);
}
cargo run -- -o dest_dir -f files.llst -n 9 -l 15k
parse_arg helper
fn parse_arg<F, R>(args: &clap::ArgMatches, name: &str, parse: F) -> Result<R>
where F: Fn(&str) -> Result<R>
{
parse(args.value_of(name).unwrap()).chain_err(|| format!("Invalid argument <{}>", name))
}
Move Args return
return Ok(Args {
dest_dir: parse_arg(&args, "dest_dir", |s| Ok(s.to_owned()) )?,
list_file: parse_arg(&args, "list_file", |s| Ok(s.to_owned()) )?,
threads_num: parse_arg(&args, "threads_num", |s| Ok(s.parse()?) )?,
speed_limit: parse_arg(&args, "speed_limit", |s| Ok(s.parse()?) )?,
});
cargo run -- -o dest_dir -f files.llst -n 9 -l 15k
Add std::fs
extern crate error_chain;
use std::fs;
Add io::Error foreign link
foreign_links {
Args(::clap::Error);
ParseInt(::std::num::ParseIntError);
IO(::std::io::Error);
}
Implement parse_dir
fn parse_dir(path: &str) -> Result<String> {
if !fs::metadata(path)?.is_dir() {
bail!("{}: expected directory", path)
}
Ok(path.to_owned())
}
dest_dir: parse_arg(&args, "dest_dir", parse_dir )?,
Implement parse_file
fn parse_file(path: &str) -> Result<String> {
if !fs::metadata(path)?.is_file() {
bail!("{}: expected file", path)
}
Ok(path.to_owned())
}
list_file: parse_arg(&args, "list_file", parse_file )?,
Implement parse_threads_num
fn parse_threads_num(value: &str) -> Result<usize> {
match value.parse()? {
0 => bail!("cannot be zero"),
n => Ok(n),
}
}
threads_num: parse_arg(&args, "threads_num", parse_threads_num )?,
Implement parse_speed_limit
fn parse_speed_limit(value: &str) -> Result<usize> {
match value.char_indices().last() {
None => Ok(0),
Some((last_i, last_ch)) => {
let multiplier: usize = match last_ch {
'k' | 'K' => 1024,
'm' | 'M' => 1024*1024,
_ => 1,
};
let value = if multiplier == 1 { value } else { &value[..last_i] };
Ok(value.parse::<usize>()? * multiplier)
}
}
}
speed_limit: parse_arg(&args, "speed_limit", parse_speed_limit )?,
Read text from file
let list_text = fs::File::open(&args.list_file)
.and_then(|mut file| {
let mut text = String::new();
file.read_to_string(&mut text)?;
Ok(text)
})
.chain_err(|| format!("Failed to read list file {}", &args.list_file))?;
use std::fs;
use std::io::Read;
Parse into list of URLs
let urls = list_text
.lines()
.filter_map(|line| {
let mut pieces = line.split(|c| " \r\n\t".contains(c)).filter(|s| !s.is_empty());
let url = pieces.next();
let fname = pieces.next();
if let (Some(url), Some(fname)) = (url, fname) {
Some((url, fname))
}
else { None }
});
for (url, fname) in urls {
println!("{} => {}", url, fname);
}
Add Hyper
[dependencies]
clap = "*"
error-chain = "*"
hyper = "0.10"
#[macro_use]
extern crate clap;
#[macro_use]
extern crate error_chain;
extern crate hyper;
foreign_links {
Args(::clap::Error);
ParseInt(::std::num::ParseIntError);
IO(::std::io::Error);
Http(::hyper::Error);
}
Minimal file download function
use std::fs;
use std::io::{self, Read};
use st::path::Path;
fn download_file(url: &str, fname: &str, dir: &str) -> Result<()> {
let mut response = hyper::Client::new().get(url).send()?;
if !response.status.is_success() {
bail!("HTTP request failed: {}", response.status);
}
let mut file = fs::File::create(Path::new(dir).join(fname))?;
let _ = io::copy(&mut response, &mut file);
Ok(())
}
for (url, fname) in urls {
println!("Downloading {} => {}", url, fname);
if let Err(error) = download_file(url, fname, &args.dest_dir) {
eprintln!("Failed to download {} => {}!\nError: {}", url, fname, error);
for err in error.iter().skip(1) {
eprintln!("Caused by: {}", err);
}
}
}
python -m http.server 8000 --bind 127.0.0.1
cargo run -- -o ../dl-here -f ../files.lst
Add crossbeam for multithreading
[dependencies]
clap = "*"
error-chain = "*"
hyper = "0.10"
crossbeam = "*"
extern crate hyper;
extern crate crossbeam;
Move loop to download_files
fn download_files<'a, F>(next: F, dir: &str)
where F: Fn() -> Option<(&'a str, &'a str)>
{
while let Some((url, fname)) = next() {
println!("Downloading {} => {}", url, fname);
if let Err(error) = download_file(url, fname, dir) {
eprintln!("Failed to download {} => {}!\nError: {}", url, fname, error);
for err in error.iter().skip(1) {
eprintln!("Caused by: {}", err);
}
}
}
}
Multithreading, with mutexes
use std::fs;
use std::io::{self, Read};
use std::path::Path;
use std::sync::Mutex;
let urls = Mutex::new(urls);
let next_url = move || urls.lock().unwrap().next();
crossbeam::scope(|scope| {
for _ in 1..args.threads_num {
let next_url = &next_url;
let dest_dir = &args.dest_dir;
scope.spawn(move || download_files(&next_url, dest_dir));
}
download_files(&next_url, &args.dest_dir);
});
cargo run -- -o ../dl-here -f ../files.lst -n 3
Add thread number to messages
fn download_files<'a, F>(tid: usize, next: F, dir: &str)
where F: Fn() -> Option<(&'a str, &'a str)>
{
while let Some((url, fname)) = next() {
println!("#{} Downloading {} => {}", tid, url, fname);
if let Err(error) = download_file(url, fname, dir) {
eprintln!("!{} Failed to download {} => {}!\n!{0} Error: {}", tid, url, fname, error);
for err in error.iter().skip(1) {
eprintln!("!{} Caused by: {}", tid, err);
}
}
}
}
crossbeam::scope(|scope| {
for n in 1..args.threads_num {
let next_url = &next_url;
let dest_dir = &args.dest_dir;
scope.spawn(move || download_files(n, &next_url, dest_dir));
}
download_files(0, &next_url, &args.dest_dir);
});
Add one-thread mode
use std::sync::Mutex;
use std::cell::RefCell;
if args.threads_num > 1 {
let urls = Mutex::new(urls);
...
});
}
else {
let urls = RefCell::new(urls);
let next_url = move || urls.borrow_mut().next();
download_files(0, next_url, &args.dest_dir);
}
cargo run -- -o ../dl-here -f ../files.lst
Add TokenBucket
use std::cell::RefCell;
use std::time::Instant;
#[derive(Debug)]
struct TokenBucket {
rate: usize,
capacity: usize,
remaining: f64,
timestamp: Instant,
}
impl TokenBucket {
fn new(rate: usize) -> TokenBucket {
TokenBucket::with_capacity(rate, rate)
}
fn with_capacity(rate: usize, capacity: usize) -> TokenBucket {
TokenBucket {
rate,
capacity,
remaining: 0f64,
timestamp: Instant::now(),
}
}
}
Add take
method
fn take(&mut self, amount: usize) -> usize {
if self.rate == 0 {
return amount;
}
let delta = {
let now = Instant::now();
now - std::mem::replace(&mut self.timestamp, now)
};
let delta_fill = ((delta.as_secs() as f64) + (delta.subsec_nanos() as f64) / 1_000_000_000f64) * (self.rate as f64);
self.remaining = (self.remaining + delta_fill).min(self.capacity as f64);
let taken = std::cmp::min(self.remaining.floor() as usize, amount);
self.remaining = (self.remaining - (taken as f64)).max(0f64);
taken
}
Implement copy_limited
use std::io::{self, Read, Write};
fn copy_limited<R, W, F>(reader: &mut R, writer: &mut W, limit: F) -> io::Result<u64>
where
R: Read + ?Sized,
W: Write + ?Sized,
F: Fn(usize) -> usize
{
let mut buf = [0; 64 * 1024];
let mut written = 0;
loop {
let limit = limit(buf.len());
if limit == 0 {
std::thread::yield_now();
continue;
}
let mut part = &mut buf[..limit];
let len = match reader.read(&mut part) {
Ok(0) => return Ok(written),
Ok(len) => len,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
writer.write_all(&mut part[..len])?;
written += len as u64;
}
}
Migrate download_file
fn download_file<L>(url: &str, fname: &str, dir: &str, limit: L) -> Result<()>
where L: Fn(usize) -> usize
{
let mut response = hyper::Client::new().get(url).send()?;
if !response.status.is_success() {
bail!("HTTP request failed: {}", response.status);
}
let mut file = fs::File::create(Path::new(dir).join(fname))?;
let _ = copy_limited(&mut response, &mut file, limit);
Ok(())
}
Migrate download_files
fn download_files<'a, F, L>(tid: usize, next: F, dir: &str, limit: L)
where
F: Fn() -> Option<(&'a str, &'a str)>,
L: Fn(usize) -> usize
{
while let Some((url, fname)) = next() {
println!("#{} Downloading {} => {}", tid, url, fname);
if let Err(error) = download_file(url, fname, dir, &limit) {
eprintln!("!{} Failed to download {} => {}!\n!{0} Error: {}", tid, url, fname, error);
for err in error.iter().skip(1) {
eprintln!("!{} Caused by: {}", tid, err);
}
}
}
}
Migrate one-thread mode
let bucket = RefCell::new(TokenBucket::new(args.speed_limit));
let limit = move |amount| bucket.borrow_mut().take(amount);
download_files(0, next_url, &args.dest_dir, limit);
Migrate multithread mode
let bucket = Mutex::new(TokenBucket::new(args.speed_limit));
let limit = move |amount| bucket.lock().unwrap().take(amount);
crossbeam::scope(|scope| {
for n in 1..args.threads_num {
let next_url = &next_url;
let limit = &limit;
let dest_dir = &args.dest_dir;
scope.spawn(move || download_files(n, next_url, dest_dir, limit));
}
download_files(0, &next_url, &args.dest_dir, &limit);
});
Add total time counter
let timer = Instant::now();
if args.threads_num > 1 {
...
}
let timer = Instant::now() - timer;
println!("Took {}.{} seconds", timer.as_secs(), timer.subsec_nanos());
cargo run -- -o ../dl-here -f ../files.lst -n 3 -l 15k