Skip to content

Instantly share code, notes, and snippets.

@listochkin
Created June 19, 2023 22:44
Show Gist options
  • Save listochkin/54f103fefc1f7e67bc789462dce72f11 to your computer and use it in GitHub Desktop.
Save listochkin/54f103fefc1f7e67bc789462dce72f11 to your computer and use it in GitHub Desktop.
Fetcher test for pnpm
[package]
name = "fetch_test"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[profile.release]
debug = true
[dependencies]
tokio = { version = "1", features = ["full"] }
bytes = "1"
reqwest = "0.11"
miette = "5"
tar = "0.4"
ssri = "9"
libdeflater = "0.14"
cacache = { version = "11", features = ["tokio-runtime", "mmap"], default-features = false }
serde_json = "1"
use miette::{IntoDiagnostic, Result};
use reqwest::Client;
use ssri::Integrity;
use std::{
collections::HashMap,
io::{Cursor, Read},
path::PathBuf,
time::Instant,
};
use tar::Archive;
use tokio::task::JoinSet;
const STORE_DIR: &str = "pnpm-store";
#[tokio::main]
async fn main() {
let start = Instant::now();
let response = fetch_tarball().await.unwrap();
let decompressed_response = decompress_gzip(&response).unwrap();
extract_tarball_parallel(decompressed_response).await.unwrap();
// extract_tarball(decompressed_response).unwrap();
let duration = start.elapsed();
println!("Time elapsed in expensive_function() is: {:?}", duration);
println!("Hello, world!");
}
async fn fetch_tarball() -> Result<bytes::Bytes, Box<dyn std::error::Error>> {
let client = Client::new();
let res = client
.get("https://registry.npmjs.org/typescript/-/typescript-3.5.3.tgz")
.send()
.await?;
Ok(res.bytes().await?)
}
pub fn decompress_gzip(gz_data: &[u8]) -> Result<Vec<u8>> {
// gzip RFC1952: a valid gzip file has an ISIZE field in the
// footer, which is a little-endian u32 number representing the
// decompressed size. This is ideal for libdeflate, which needs
// preallocating the decompressed buffer.
let isize = {
let isize_start = gz_data.len() - 4;
let isize_bytes: [u8; 4] =
gz_data[isize_start..].try_into().into_diagnostic()?;
u32::from_le_bytes(isize_bytes) as usize
};
let mut decompressor = libdeflater::Decompressor::new();
let mut outbuf = vec![0; isize];
decompressor
.gzip_decompress(gz_data, &mut outbuf)
.into_diagnostic()?;
Ok(outbuf)
}
pub fn extract_tarball(data: Vec<u8>) -> miette::Result<()> {
// Generate the tarball archive given the decompressed bytes
let mut node_archive = Archive::new(Cursor::new(data));
// extract to both the global store + node_modules (in the case of them using the pnpm linking algorithm)
let mut cas_file_map: HashMap<String, Integrity> = HashMap::new();
// Add package's directory to list of created directories
let mut created_directories: Vec<PathBuf> = vec![];
for entry in node_archive.entries().into_diagnostic()? {
let mut entry = entry.into_diagnostic()?;
// Read the contents of the entry
let mut buffer = Vec::with_capacity(entry.size() as usize);
entry.read_to_end(&mut buffer).into_diagnostic()?;
let entry_path = entry.path().unwrap();
// Remove `package/` from `package/lib/index.js`
let cleaned_entry_path_string = entry_path.strip_prefix("package/").unwrap();
// Get the entry's parent
let entry_path_parent = entry_path.parent().unwrap();
// If we haven't created this directory yet, create it
if !created_directories.iter().any(|p| p == entry_path_parent) {
created_directories.push(entry_path_parent.to_path_buf());
std::fs::create_dir_all(entry_path_parent).into_diagnostic()?;
}
// Write the contents of the entry into the content-addressable store located at `app.volt_dir`
// We get a hash of the file
let sri = cacache::write_hash_sync(STORE_DIR, &buffer).into_diagnostic()?;
// Insert the name of the file and map it to the hash of the file
cas_file_map.insert(cleaned_entry_path_string.to_str().unwrap().to_string(), sri);
}
// Write the file, shasum map to the content-addressable store
cacache::write_sync(
STORE_DIR,
"xxxxxxxxx",
serde_json::to_string(&cas_file_map).into_diagnostic()?,
)
.into_diagnostic()?;
Ok(())
}
pub async fn extract_tarball_parallel(data: Vec<u8>) -> miette::Result<()> {
// Generate the tarball archive given the decompressed bytes
let mut node_archive = Archive::new(Cursor::new(data));
// extract to both the global store + node_modules (in the case of them using the pnpm linking algorithm)
let mut cas_file_map: HashMap<String, Integrity> = HashMap::new();
// Add package's directory to list of created directories
let mut created_directories: Vec<PathBuf> = vec![];
let mut tasks = JoinSet::new();
for entry in node_archive.entries().into_diagnostic()? {
let mut entry = entry.into_diagnostic()?;
let mut buffer = Vec::with_capacity(entry.size() as usize);
entry.read_to_end(&mut buffer).into_diagnostic()?;
let entry_path = entry.path().unwrap();
let cleaned_entry_path_string =
entry_path.strip_prefix("package/").unwrap();
let cleaned_entry_path_string =
cleaned_entry_path_string.to_str().unwrap().to_string();
// Get the entry's parent
let entry_path_parent = entry_path.parent().unwrap();
// If we haven't created this directory yet, create it
if !created_directories.iter().any(|p| p == entry_path_parent) {
created_directories.push(entry_path_parent.to_path_buf());
std::fs::create_dir_all(entry_path_parent).into_diagnostic()?;
}
tasks.spawn(async move {
let sri_result = cacache::write_hash(STORE_DIR, &buffer).await;
(cleaned_entry_path_string, sri_result)
});
}
while let Some(r) = tasks.join_next().await {
let (cleaned_entry_path_string, sri) = r.into_diagnostic()?;
let sri = sri.into_diagnostic()?;
cas_file_map.insert(cleaned_entry_path_string, sri);
}
// Write the file, shasum map to the content-addressable store
cacache::write_sync(
STORE_DIR,
"xxxxxxxxx",
serde_json::to_string(&cas_file_map).into_diagnostic()?,
)
.into_diagnostic()?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment