Last active
December 19, 2023 03:20
-
-
Save jrcavani/a1f33ebd83f14784d36e7f75d8d85e5d to your computer and use it in GitHub Desktop.
tar vs tokio-tar
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use anyhow::Result; | |
use clap::Parser; | |
use futures::TryStreamExt; | |
use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; | |
use indicatif_log_bridge::LogWrapper; | |
#[allow(unused_imports)] | |
use log::{error, info}; | |
use std::sync::Arc; | |
use tokio::fs::File; | |
use tokio::io::{AsyncWriteExt, BufReader, BufWriter}; | |
use tokio::sync::Semaphore; | |
use tokio_tar::{Archive, Builder}; | |
use tokio_util::task::TaskTracker; | |
#[derive(Parser)] | |
struct Cli { | |
/// Output tar path | |
#[clap(long)] | |
output_dir: String, | |
/// Temp dir to store the downloaded blobs | |
#[clap(long)] | |
temp_blobs_dir: String, | |
/// Number of workers | |
#[clap(long, default_value_t = 4)] | |
n_workers: usize, | |
} | |
async fn make_tarball( | |
chunk_idx: usize, | |
input_path: &str, | |
output_path: &str, | |
temp_blobs_dir: &str, | |
pb_manager: &MultiProgress, | |
) -> Result<()> { | |
let pb = pb_manager.add(ProgressBar::new(10000)); | |
let sty = ProgressStyle::with_template( | |
"{spinner:.red} {prefix:24} [{bar:40.red/white}] {pos:>5}/{len:5} [{elapsed}<{eta} {per_sec:.green}] {msg}", | |
)? | |
.progress_chars("#>-"); | |
pb.set_style(sty); | |
pb.set_prefix(format!(" Archiver: chunk {chunk_idx}")); | |
// input | |
let mut input_tarball = Archive::new(BufReader::new(File::open(input_path).await?)); | |
let mut entries = input_tarball.entries()?; | |
let mut tarball = Builder::new(BufWriter::new(File::create(output_path).await?)); | |
while let Some(mut entry) = entries.try_next().await? { | |
let name = entry | |
.path()? | |
.to_str() | |
.ok_or(anyhow::anyhow!("Invalid path."))? | |
.to_owned(); | |
let tmp_path = format!("{temp_blobs_dir}/{name}"); | |
entry.unpack(&tmp_path).await?; | |
tarball.append_path_with_name(&tmp_path, name).await?; | |
pb.inc(1); | |
} | |
let mut writer = tarball.into_inner().await?; | |
writer.flush().await?; | |
pb.finish_and_clear(); | |
pb_manager.remove(&pb); | |
Ok(()) | |
} | |
#[tokio::main] | |
async fn main() -> Result<()> { | |
let logger = env_logger::Builder::from_env(env_logger::Env::default()).build(); | |
let pb_manager = MultiProgress::new(); | |
LogWrapper::new(pb_manager.clone(), logger).try_init()?; | |
let args = Cli::parse(); | |
// create output dirs | |
tokio::fs::create_dir_all(&args.output_dir).await?; | |
tokio::fs::create_dir_all(&args.temp_blobs_dir).await?; | |
let pb_main = pb_manager.add(ProgressBar::new_spinner()); | |
let sty = ProgressStyle::with_template( | |
"==== {msg} | Done: {pos} chunks this round [{elapsed_precise} {per_sec:.green}] ====", | |
)?; | |
pb_main.set_style(sty); | |
let sem = Arc::new(Semaphore::new(args.n_workers)); | |
let tracker = TaskTracker::new(); | |
for chunk_idx in 0..31 { | |
let temp_blobs_dir = args.temp_blobs_dir.clone(); | |
let pb_manager = pb_manager.clone(); | |
let pb_main_ = pb_main.clone(); | |
let output_dir = args.output_dir.clone(); | |
let permit = Arc::clone(&sem).acquire_owned().await; | |
tracker.spawn(async move { | |
let _permit = permit; // consume semaphore | |
let basename = format!("{chunk_idx:04}.tar"); | |
let tarball_path = format!("{output_dir}/{basename}"); | |
let new_tarball_path = format!("{output_dir}/new-{basename}"); | |
make_tarball( | |
chunk_idx, | |
&tarball_path, | |
&new_tarball_path, | |
&temp_blobs_dir, | |
&pb_manager, | |
) | |
.await?; | |
pb_main_.inc(1); | |
anyhow::Ok(()) | |
}); | |
pb_main.set_message(format!("Sent chunk {chunk_idx}")); | |
} | |
tracker.close(); | |
tracker.wait().await; | |
pb_main.finish(); | |
Ok(()) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use anyhow::Result; | |
use clap::Parser; | |
use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; | |
use indicatif_log_bridge::LogWrapper; | |
#[allow(unused_imports)] | |
use log::{error, info}; | |
use rayon::prelude::*; | |
use std::fs::File; | |
use std::io::{BufReader, BufWriter, Write}; | |
use tar::{Archive, Builder}; | |
#[derive(Parser)] | |
struct Cli { | |
/// Output tar path | |
#[clap(long)] | |
output_dir: String, | |
/// Temp dir to store the downloaded blobs | |
#[clap(long)] | |
temp_blobs_dir: String, | |
/// Number of workers | |
#[clap(long, default_value_t = 4)] | |
n_workers: usize, | |
} | |
fn make_tarball( | |
chunk_idx: usize, | |
input_path: &str, | |
output_path: &str, | |
temp_blobs_dir: &str, | |
pb_manager: &MultiProgress, | |
) -> Result<()> { | |
let pb = pb_manager.add(ProgressBar::new(10000)); | |
let sty = ProgressStyle::with_template( | |
"{spinner:.red} {prefix:24} [{bar:40.red/white}] {pos:>5}/{len:5} [{elapsed}<{eta} {per_sec:.green}] {msg}", | |
)? | |
.progress_chars("#>-"); | |
pb.set_style(sty); | |
pb.set_prefix(format!(" Archiver: chunk {chunk_idx}")); | |
// input | |
let mut input_tarball = Archive::new(BufReader::new(File::open(input_path)?)); | |
let entries = input_tarball.entries()?; | |
let mut tarball = Builder::new(BufWriter::new(File::create(output_path)?)); | |
for entry in entries { | |
let mut entry = entry?; | |
let name = entry | |
.path()? | |
.to_str() | |
.ok_or(anyhow::anyhow!("Invalid path."))? | |
.to_owned(); | |
let tmp_path = format!("{temp_blobs_dir}/{name}"); | |
entry.unpack(&tmp_path)?; | |
tarball.append_path_with_name(&tmp_path, name)?; | |
pb.inc(1); | |
} | |
let mut writer = tarball.into_inner()?; | |
writer.flush()?; | |
pb.finish_and_clear(); | |
pb_manager.remove(&pb); | |
Ok(()) | |
} | |
fn main() -> Result<()> { | |
let logger = env_logger::Builder::from_env(env_logger::Env::default()).build(); | |
let pb_manager = MultiProgress::new(); | |
LogWrapper::new(pb_manager.clone(), logger).try_init()?; | |
let args = Cli::parse(); | |
// create output dirs | |
std::fs::create_dir_all(&args.output_dir)?; | |
std::fs::create_dir_all(&args.temp_blobs_dir)?; | |
let pb_main = pb_manager.add(ProgressBar::new_spinner()); | |
let sty = ProgressStyle::with_template( | |
"==== {msg} | Done: {pos} chunks this round [{elapsed_precise} {per_sec:.green}] ====", | |
)?; | |
pb_main.set_style(sty); | |
rayon::ThreadPoolBuilder::new() | |
.num_threads(args.n_workers) | |
.build_global() | |
.unwrap(); | |
(0..31).into_par_iter().try_for_each(|chunk_idx| { | |
let temp_blobs_dir = args.temp_blobs_dir.clone(); | |
let pb_manager = pb_manager.clone(); | |
let pb_main_ = pb_main.clone(); | |
let output_dir = args.output_dir.clone(); | |
let basename = format!("{chunk_idx:04}.tar"); | |
let tarball_path = format!("{output_dir}/{basename}"); | |
let new_tarball_path = format!("{output_dir}/new-{basename}"); | |
make_tarball( | |
chunk_idx, | |
&tarball_path, | |
&new_tarball_path, | |
&temp_blobs_dir, | |
&pb_manager, | |
)?; | |
pb_main_.inc(1); | |
pb_main.set_message(format!("Sent chunk {chunk_idx}")); | |
anyhow::Ok(()) | |
})?; | |
pb_main.finish(); | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment