Last active
November 29, 2020 22:03
-
-
Save u8sand/49a5cf63e81753bec08c52fb5127c6de to your computer and use it in GitHub Desktop.
Rust Disk Matrix Transpose: for transposing large matrix data-frames without lots of memory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// #!/usr/bin/env run-cargo-script | |
//! Uncomment first line & remove extension for use as cargo rust script, left as .rs for syntax highlighting on github. | |
//! | |
//! Transpose large matrix data-frames in csv/tsv format with | |
//! extremely low memory usage. It does this by reading | |
//! the matrix top-down. | |
//! | |
//! For usage instructions see rmdt --help | |
//! | |
//! ```cargo | |
//! [dependencies] | |
//! bytecount = "0.6.0" | |
//! clap = "2.33" | |
//! indicatif = "0.15" | |
//! memchr = "2.3.4" | |
//! ``` | |
//! | |
//! Changelog: | |
//! v6: | |
//! Fixed in-memory mode to support as much memory as you have. | |
//! v5: | |
//! Add in-memory mode which simply loads the full file into a bufreader | |
//! unfortunately, for now, this seems to have a maximum capacity of 2^31. | |
//! v4: | |
//! Eliminate second vector copying by using BufWrite fill_buf directly | |
//! Eliminate temporary files by seeking around the file | |
//! this has the benefit of not requiring n file writers/readers | |
//! Speed about the same, memory usage much less, and certain transposes are | |
//! possible when they were not before. | |
//! v3: Abstract out csv traversal | |
//! Eliminate additional vec heap allocations during col splitting | |
//! >2x speedup | |
//! v2: Added progress bar | |
//! v1: Added base transpose script | |
// Dependencies | |
#![feature(bufreader_seek_relative)] | |
#[macro_use] extern crate clap; | |
extern crate indicatif; | |
extern crate bytecount; | |
extern crate memchr; | |
use memchr::{memchr,memchr2}; | |
use std::io::{Cursor, Read, Seek, BufRead, BufReader, Write, BufWriter, stdout, ErrorKind}; | |
use std::fs::File; | |
// CLI | |
pub fn cli<'a>() -> clap::ArgMatches<'a> { | |
clap::App::new("Rust Disk Matrix Transpose") | |
.version(crate_version!()) | |
.author("Daniel J. B. Clarke <[email protected]>") | |
.about("Transpose large matrices on disk") | |
.arg( | |
clap::Arg::with_name("column-delimiter") | |
.value_name("DELIMITER") | |
.short("c") | |
.long("column-delimiter") | |
.takes_value(true) | |
.required(false) | |
.help("The delimiter separating each column, defaults to tab") | |
) | |
.arg( | |
clap::Arg::with_name("row-delimiter") | |
.value_name("DELIMITER") | |
.short("r") | |
.long("row-delimiter") | |
.takes_value(true) | |
.required(false) | |
.help("The delimiter separating each row, defaults to newline") | |
) | |
.arg( | |
clap::Arg::with_name("input-file") | |
.value_name("FILE") | |
.short("i") | |
.long("input-file") | |
.takes_value(true) | |
.required(true) | |
.help("The input file to slice") | |
) | |
.arg( | |
clap::Arg::with_name("output-file") | |
.value_name("FILE") | |
.short("o") | |
.long("output-file") | |
.takes_value(true) | |
.required(false) | |
.help("The output file where the transpose will be saved, defaults to stdout") | |
) | |
.arg( | |
clap::Arg::with_name("record-size") | |
.value_name("SIZE") | |
.short("s") | |
.long("record-size") | |
.takes_value(true) | |
.required(false) | |
.help("The maximum record size, too small will result in heap allocations, too big will result in unnecessary reads. Default to estimating it based on average col size") | |
) | |
.arg( | |
clap::Arg::with_name("memory") | |
.short("m") | |
.long("memory") | |
.help("In-memory flag--if the file fits in memory, loading it all in memory will work the fastest..") | |
) | |
.get_matches() | |
} | |
trait ReadSeek: Read + Seek {} | |
impl ReadSeek for File {} | |
impl ReadSeek for Cursor<Vec<u8>> {} | |
fn main() -> Result<(), Error> { | |
// process cli arguments | |
let matches = cli(); | |
let input_file = matches.value_of("input-file").unwrap(); | |
let output_file = matches.value_of("output-file").unwrap_or("-"); | |
let column_delimiter = one_and_only(&mut matches.value_of("column-delimiter").unwrap_or("\t").bytes())?; | |
let row_delimiter = one_and_only(&mut matches.value_of("row-delimiter").unwrap_or("\n").bytes())?; | |
let record_size = match matches.value_of("record-size") { | |
Some(s) => Some(s.parse::<usize>()?), | |
None => None, | |
}; | |
let memory = matches.is_present("memory"); | |
if input_file == output_file { | |
return Err(Error::from("In-place transpose not yet supported")); | |
} | |
let input_file_metadata = std::fs::metadata(input_file)?; | |
if !input_file_metadata.is_file() { | |
return Err(Error::from(format!("{} is not a file", input_file))); | |
} | |
let file_size = input_file_metadata.len(); | |
// Setup progress bar | |
let progressbar = indicatif::ProgressBar::new(file_size); | |
progressbar.set_style( | |
indicatif::ProgressStyle::default_bar() | |
.template("[{elapsed_precise}] {bar:40.cyan/blue} {bytes} / {total_bytes} {bytes_per_sec} ETA {eta} {msg}") | |
); | |
let min_size_update = std::cmp::max(file_size / 100, 1); | |
// Step 0: | |
// Prepare the reader | |
let mut fr: BufReader<Box<dyn ReadSeek>> = if memory { | |
progressbar.set_message("Step 0/2. Reading file into memory"); | |
progressbar.set_position(0); | |
let mut buf: Vec<u8> = vec![0; file_size as usize]; | |
let mut f = File::open(input_file)?; | |
let mut pos: usize = 0; | |
let mut pos_update: usize = min_size_update as usize; | |
{ // from read_exact | |
let mut buf: &mut [u8] = buf.as_mut_slice(); | |
while !buf.is_empty() { | |
match f.read(buf) { | |
Ok(0) => break, | |
Ok(n) => { | |
pos += n; | |
if pos >= pos_update { | |
progressbar.set_position(pos as u64); | |
pos_update = pos + pos % (min_size_update as usize); | |
} | |
let tmp = buf; | |
buf = &mut tmp[n..]; | |
} | |
Err(ref e) if e.kind() == ErrorKind::Interrupted => {} | |
Err(e) => return Err(Error::from(e)), | |
} | |
} | |
if !buf.is_empty() { | |
Err(std::io::Error::new(ErrorKind::UnexpectedEof, "failed to fill whole buffer")) | |
} else { | |
Ok(()) | |
} | |
}?; | |
progressbar.finish_and_clear(); | |
BufReader::new(Box::new(Cursor::new(buf))) | |
} else { | |
BufReader::new(Box::new(File::open(input_file)?)) | |
}; | |
progressbar.reset(); | |
// Step 1: | |
// Traverse the file, locating the start position of each line in the file | |
// and determining the number of columns | |
// positions of start of each line | |
let mut cursors: Vec<usize> = Vec::new(); | |
// number of columns | |
let mut cc = 0; | |
let mut pos: usize = 0; | |
{ | |
progressbar.set_message("Step 1/2. Locating starts of each row"); | |
progressbar.set_position(0); | |
let mut pos_update: usize = min_size_update as usize; | |
cc += 1; | |
cursors.push(0); | |
loop { | |
if pos >= pos_update { | |
progressbar.set_position(pos as u64); | |
pos_update = pos + pos % (min_size_update as usize); | |
} | |
let (done, used) = { | |
let bytes = match fr.fill_buf() { | |
Ok(bytes) => Ok(bytes), | |
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, | |
Err(e) => Err(e), | |
}?; | |
if !bytes.is_empty() { | |
let ret: Result<(bool, usize), Error> = match memchr(row_delimiter, &bytes) { | |
None => { | |
if cursors.len() == 1 { | |
cc += bytecount::count(&bytes, column_delimiter); | |
} | |
Ok((false, bytes.len())) | |
}, | |
Some(ind) => { | |
if cursors.len() == 1 { | |
cc += bytecount::count(&bytes[..ind], column_delimiter); | |
} | |
cursors.push(pos+ind+1); | |
Ok((false, ind+1)) | |
}, | |
}; | |
ret | |
} else { | |
Ok((true, 0)) | |
} | |
}?; | |
pos += used; | |
fr.consume(used); | |
if done { | |
break | |
} | |
} | |
progressbar.finish_and_clear(); | |
} | |
progressbar.reset(); | |
// Step 2: | |
// Traverse file vertically writing to output file | |
let mut fr = if memory { | |
fr.seek_relative(-(pos as i64))?; | |
fr | |
} else { | |
let record_size = match record_size { | |
Some(s) => s, | |
None => { | |
let delimiter_size = cursors.len() * cc - 1; // number of rows (newlines) * columns - last newline (underestimate) | |
let data_size = (file_size as usize) - delimiter_size; // data is whatever isn't a delimiter | |
let estimate = (2*data_size) / delimiter_size; // 2* average is hopefully a decent guess for record size; | |
progressbar.println(format!("Est. record size: {}", estimate)); | |
estimate | |
}, | |
}; | |
let ret: BufReader<std::boxed::Box<dyn ReadSeek>> = BufReader::with_capacity(record_size, Box::new(File::open(input_file)?)); | |
ret | |
}; | |
pos = 0; | |
{ | |
progressbar.set_message("Step 2/2. Transposing"); | |
progressbar.set_position(0); | |
progressbar.println(format!("({}, {}) => ({}, {})", cursors.len(), cc, cc, cursors.len())); | |
let mut fw: Box<dyn Write> = if output_file == "-" { | |
Box::new(BufWriter::new(stdout())) | |
} else { | |
Box::new(BufWriter::new(File::create(output_file)?)) | |
}; | |
let mut excess: Vec<u8> = Vec::new(); | |
let mut output_file_size: usize = 0; | |
let mut output_file_size_update: usize = min_size_update as usize; | |
for row in (0..cc).into_iter() { | |
if row != 0 { | |
fw.write(&[row_delimiter])?; | |
output_file_size += 1; | |
} | |
for (col, cur) in cursors.iter_mut().enumerate() { | |
if output_file_size >= output_file_size_update { | |
progressbar.set_position(output_file_size as u64); | |
output_file_size_update = output_file_size + output_file_size % (min_size_update as usize); | |
} | |
let d_pos = (*cur as i64) - (pos as i64); | |
fr.seek_relative(d_pos)?; | |
pos = *cur; | |
excess.clear(); | |
'inner: loop { | |
let (done, used) = { | |
let bytes = match fr.fill_buf() { | |
Ok(bytes) => Ok(bytes), | |
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, | |
Err(e) => Err(e), | |
}?; | |
if !bytes.is_empty() { | |
let ret: Result<(bool, usize), Error> = match memchr2(column_delimiter, row_delimiter, &bytes) { | |
Some(ind) => { | |
if col != 0 { | |
fw.write(&[column_delimiter])?; | |
output_file_size += 1; | |
} | |
if excess.is_empty() { | |
fw.write(&bytes[..ind])?; | |
output_file_size += ind; | |
*cur += ind+1; | |
} else { | |
excess.extend_from_slice(&bytes[..ind]); | |
fw.write(&excess.as_slice())?; | |
output_file_size += excess.len(); | |
*cur += excess.len()+1; | |
excess.clear(); | |
} | |
Ok((true, ind+1)) | |
}, | |
None => { | |
// We reached the end of the buffer without a col/row remaining | |
// either EOF or end of buffer | |
excess.extend_from_slice(bytes); | |
Ok((false, bytes.len())) | |
}, | |
}; | |
ret | |
} else { | |
if !excess.is_empty() { | |
if col != 0 { | |
fw.write(&[column_delimiter])?; | |
output_file_size += 1; | |
} | |
fw.write(&excess.as_slice())?; | |
output_file_size += excess.len(); | |
} | |
Ok((true, 0)) | |
} | |
}?; | |
pos += used; | |
fr.consume(used); | |
if done { | |
break 'inner; | |
} | |
} | |
} | |
} | |
progressbar.finish_and_clear(); | |
} | |
Ok(()) | |
} | |
// Utility functions | |
fn one_and_only<T>(it: &mut impl Iterator<Item = T>) -> Result<T, &str> { | |
if let Some(first) = it.next() { | |
if let Some(_) = it.next() { | |
Err("Expected one, got many") | |
} else { | |
Ok(first) | |
} | |
} else { | |
Err("Expected one, got none") | |
} | |
} | |
// Error handlers | |
#[derive(Debug)] | |
struct Error(std::io::Error); | |
impl From<String> for Error { | |
fn from(error: String) -> Self { | |
Error(std::io::Error::new(std::io::ErrorKind::Other, error)) | |
} | |
} | |
impl From<&str> for Error { | |
fn from(error: &str) -> Self { | |
Error(std::io::Error::new(std::io::ErrorKind::Other, String::from(error))) | |
} | |
} | |
impl From<std::io::Error> for Error { | |
fn from(error: std::io::Error) -> Self { | |
Error(error) | |
} | |
} | |
impl From<std::num::TryFromIntError> for Error { | |
fn from(_error: std::num::TryFromIntError) -> Self { | |
Error::from("Failed to convert integer, out of bounds") | |
} | |
} | |
impl From<std::num::ParseIntError> for Error { | |
fn from(_error: std::num::ParseIntError) -> Self { | |
Error::from("Failed to convert string to integer") | |
} | |
} | |
impl From<std::convert::Infallible> for Error { | |
fn from(_error: std::convert::Infallible) -> Self { | |
Error::from("Failed to convert integer, out of bounds") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment