Skip to content

Instantly share code, notes, and snippets.

@jrcavani
Last active December 19, 2023 03:20
Show Gist options
  • Save jrcavani/a1f33ebd83f14784d36e7f75d8d85e5d to your computer and use it in GitHub Desktop.
Save jrcavani/a1f33ebd83f14784d36e7f75d8d85e5d to your computer and use it in GitHub Desktop.
tar vs tokio-tar
use anyhow::Result;
use clap::Parser;
use futures::TryStreamExt;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use indicatif_log_bridge::LogWrapper;
#[allow(unused_imports)]
use log::{error, info};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
use tokio::sync::Semaphore;
use tokio_tar::{Archive, Builder};
use tokio_util::task::TaskTracker;
#[derive(Parser)]
struct Cli {
/// Output tar path
#[clap(long)]
output_dir: String,
/// Temp dir to store the downloaded blobs
#[clap(long)]
temp_blobs_dir: String,
/// Number of workers
#[clap(long, default_value_t = 4)]
n_workers: usize,
}
async fn make_tarball(
chunk_idx: usize,
input_path: &str,
output_path: &str,
temp_blobs_dir: &str,
pb_manager: &MultiProgress,
) -> Result<()> {
let pb = pb_manager.add(ProgressBar::new(10000));
let sty = ProgressStyle::with_template(
"{spinner:.red} {prefix:24} [{bar:40.red/white}] {pos:>5}/{len:5} [{elapsed}<{eta} {per_sec:.green}] {msg}",
)?
.progress_chars("#>-");
pb.set_style(sty);
pb.set_prefix(format!(" Archiver: chunk {chunk_idx}"));
// input
let mut input_tarball = Archive::new(BufReader::new(File::open(input_path).await?));
let mut entries = input_tarball.entries()?;
let mut tarball = Builder::new(BufWriter::new(File::create(output_path).await?));
while let Some(mut entry) = entries.try_next().await? {
let name = entry
.path()?
.to_str()
.ok_or(anyhow::anyhow!("Invalid path."))?
.to_owned();
let tmp_path = format!("{temp_blobs_dir}/{name}");
entry.unpack(&tmp_path).await?;
tarball.append_path_with_name(&tmp_path, name).await?;
pb.inc(1);
}
let mut writer = tarball.into_inner().await?;
writer.flush().await?;
pb.finish_and_clear();
pb_manager.remove(&pb);
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let logger = env_logger::Builder::from_env(env_logger::Env::default()).build();
let pb_manager = MultiProgress::new();
LogWrapper::new(pb_manager.clone(), logger).try_init()?;
let args = Cli::parse();
// create output dirs
tokio::fs::create_dir_all(&args.output_dir).await?;
tokio::fs::create_dir_all(&args.temp_blobs_dir).await?;
let pb_main = pb_manager.add(ProgressBar::new_spinner());
let sty = ProgressStyle::with_template(
"==== {msg} | Done: {pos} chunks this round [{elapsed_precise} {per_sec:.green}] ====",
)?;
pb_main.set_style(sty);
let sem = Arc::new(Semaphore::new(args.n_workers));
let tracker = TaskTracker::new();
for chunk_idx in 0..31 {
let temp_blobs_dir = args.temp_blobs_dir.clone();
let pb_manager = pb_manager.clone();
let pb_main_ = pb_main.clone();
let output_dir = args.output_dir.clone();
let permit = Arc::clone(&sem).acquire_owned().await;
tracker.spawn(async move {
let _permit = permit; // consume semaphore
let basename = format!("{chunk_idx:04}.tar");
let tarball_path = format!("{output_dir}/{basename}");
let new_tarball_path = format!("{output_dir}/new-{basename}");
make_tarball(
chunk_idx,
&tarball_path,
&new_tarball_path,
&temp_blobs_dir,
&pb_manager,
)
.await?;
pb_main_.inc(1);
anyhow::Ok(())
});
pb_main.set_message(format!("Sent chunk {chunk_idx}"));
}
tracker.close();
tracker.wait().await;
pb_main.finish();
Ok(())
}
use anyhow::Result;
use clap::Parser;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use indicatif_log_bridge::LogWrapper;
#[allow(unused_imports)]
use log::{error, info};
use rayon::prelude::*;
use std::fs::File;
use std::io::{BufReader, BufWriter, Write};
use tar::{Archive, Builder};
#[derive(Parser)]
struct Cli {
/// Output tar path
#[clap(long)]
output_dir: String,
/// Temp dir to store the downloaded blobs
#[clap(long)]
temp_blobs_dir: String,
/// Number of workers
#[clap(long, default_value_t = 4)]
n_workers: usize,
}
fn make_tarball(
chunk_idx: usize,
input_path: &str,
output_path: &str,
temp_blobs_dir: &str,
pb_manager: &MultiProgress,
) -> Result<()> {
let pb = pb_manager.add(ProgressBar::new(10000));
let sty = ProgressStyle::with_template(
"{spinner:.red} {prefix:24} [{bar:40.red/white}] {pos:>5}/{len:5} [{elapsed}<{eta} {per_sec:.green}] {msg}",
)?
.progress_chars("#>-");
pb.set_style(sty);
pb.set_prefix(format!(" Archiver: chunk {chunk_idx}"));
// input
let mut input_tarball = Archive::new(BufReader::new(File::open(input_path)?));
let entries = input_tarball.entries()?;
let mut tarball = Builder::new(BufWriter::new(File::create(output_path)?));
for entry in entries {
let mut entry = entry?;
let name = entry
.path()?
.to_str()
.ok_or(anyhow::anyhow!("Invalid path."))?
.to_owned();
let tmp_path = format!("{temp_blobs_dir}/{name}");
entry.unpack(&tmp_path)?;
tarball.append_path_with_name(&tmp_path, name)?;
pb.inc(1);
}
let mut writer = tarball.into_inner()?;
writer.flush()?;
pb.finish_and_clear();
pb_manager.remove(&pb);
Ok(())
}
fn main() -> Result<()> {
let logger = env_logger::Builder::from_env(env_logger::Env::default()).build();
let pb_manager = MultiProgress::new();
LogWrapper::new(pb_manager.clone(), logger).try_init()?;
let args = Cli::parse();
// create output dirs
std::fs::create_dir_all(&args.output_dir)?;
std::fs::create_dir_all(&args.temp_blobs_dir)?;
let pb_main = pb_manager.add(ProgressBar::new_spinner());
let sty = ProgressStyle::with_template(
"==== {msg} | Done: {pos} chunks this round [{elapsed_precise} {per_sec:.green}] ====",
)?;
pb_main.set_style(sty);
rayon::ThreadPoolBuilder::new()
.num_threads(args.n_workers)
.build_global()
.unwrap();
(0..31).into_par_iter().try_for_each(|chunk_idx| {
let temp_blobs_dir = args.temp_blobs_dir.clone();
let pb_manager = pb_manager.clone();
let pb_main_ = pb_main.clone();
let output_dir = args.output_dir.clone();
let basename = format!("{chunk_idx:04}.tar");
let tarball_path = format!("{output_dir}/{basename}");
let new_tarball_path = format!("{output_dir}/new-{basename}");
make_tarball(
chunk_idx,
&tarball_path,
&new_tarball_path,
&temp_blobs_dir,
&pb_manager,
)?;
pb_main_.inc(1);
pb_main.set_message(format!("Sent chunk {chunk_idx}"));
anyhow::Ok(())
})?;
pb_main.finish();
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment