Skip to content

Instantly share code, notes, and snippets.

@u8sand
Last active November 29, 2020 22:03
Show Gist options
  • Save u8sand/49a5cf63e81753bec08c52fb5127c6de to your computer and use it in GitHub Desktop.
Save u8sand/49a5cf63e81753bec08c52fb5127c6de to your computer and use it in GitHub Desktop.
Rust Disk Matrix Transpose: for transposing large matrix data-frames without lots of memory
// #!/usr/bin/env run-cargo-script
//! Uncomment first line & remove extension for use as cargo rust script, left as .rs for syntax highlighting on github.
//!
//! Transpose large matrix data-frames in csv/tsv format with
//! extremely low memory usage. It does this by reading
//! the matrix top-down.
//!
//! For usage instructions see rmdt --help
//!
//! ```cargo
//! [dependencies]
//! bytecount = "0.6.0"
//! clap = "2.33"
//! indicatif = "0.15"
//! memchr = "2.3.4"
//! ```
//!
//! Changelog:
//! v6:
//! Fixed in-memory mode to support as much memory as you have.
//! v5:
//! Add in-memory mode which simply loads the full file into a bufreader
//! unfortunately, for now, this seems to have a maximum capacity of 2^31.
//! v4:
//! Eliminate second vector copying by using BufWrite fill_buf directly
//! Eliminate temporary files by seeking around the file
//! this has the benefit of not requiring n file writers/readers
//! Speed about the same, memory usage much less, and certain transposes are
//! possible when they were not before.
//! v3: Abstract out csv traversal
//! Eliminate additional vec heap allocations during col splitting
//! >2x speedup
//! v2: Added progress bar
//! v1: Added base transpose script
// Dependencies
#![feature(bufreader_seek_relative)]
#[macro_use] extern crate clap;
extern crate indicatif;
extern crate bytecount;
extern crate memchr;
use memchr::{memchr,memchr2};
use std::io::{Cursor, Read, Seek, BufRead, BufReader, Write, BufWriter, stdout, ErrorKind};
use std::fs::File;
// CLI
pub fn cli<'a>() -> clap::ArgMatches<'a> {
clap::App::new("Rust Disk Matrix Transpose")
.version(crate_version!())
.author("Daniel J. B. Clarke <[email protected]>")
.about("Transpose large matrices on disk")
.arg(
clap::Arg::with_name("column-delimiter")
.value_name("DELIMITER")
.short("c")
.long("column-delimiter")
.takes_value(true)
.required(false)
.help("The delimiter separating each column, defaults to tab")
)
.arg(
clap::Arg::with_name("row-delimiter")
.value_name("DELIMITER")
.short("r")
.long("row-delimiter")
.takes_value(true)
.required(false)
.help("The delimiter separating each row, defaults to newline")
)
.arg(
clap::Arg::with_name("input-file")
.value_name("FILE")
.short("i")
.long("input-file")
.takes_value(true)
.required(true)
.help("The input file to slice")
)
.arg(
clap::Arg::with_name("output-file")
.value_name("FILE")
.short("o")
.long("output-file")
.takes_value(true)
.required(false)
.help("The output file where the transpose will be saved, defaults to stdout")
)
.arg(
clap::Arg::with_name("record-size")
.value_name("SIZE")
.short("s")
.long("record-size")
.takes_value(true)
.required(false)
.help("The maximum record size, too small will result in heap allocations, too big will result in unnecessary reads. Default to estimating it based on average col size")
)
.arg(
clap::Arg::with_name("memory")
.short("m")
.long("memory")
.help("In-memory flag--if the file fits in memory, loading it all in memory will work the fastest..")
)
.get_matches()
}
trait ReadSeek: Read + Seek {}
impl ReadSeek for File {}
impl ReadSeek for Cursor<Vec<u8>> {}
fn main() -> Result<(), Error> {
// process cli arguments
let matches = cli();
let input_file = matches.value_of("input-file").unwrap();
let output_file = matches.value_of("output-file").unwrap_or("-");
let column_delimiter = one_and_only(&mut matches.value_of("column-delimiter").unwrap_or("\t").bytes())?;
let row_delimiter = one_and_only(&mut matches.value_of("row-delimiter").unwrap_or("\n").bytes())?;
let record_size = match matches.value_of("record-size") {
Some(s) => Some(s.parse::<usize>()?),
None => None,
};
let memory = matches.is_present("memory");
if input_file == output_file {
return Err(Error::from("In-place transpose not yet supported"));
}
let input_file_metadata = std::fs::metadata(input_file)?;
if !input_file_metadata.is_file() {
return Err(Error::from(format!("{} is not a file", input_file)));
}
let file_size = input_file_metadata.len();
// Setup progress bar
let progressbar = indicatif::ProgressBar::new(file_size);
progressbar.set_style(
indicatif::ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:40.cyan/blue} {bytes} / {total_bytes} {bytes_per_sec} ETA {eta} {msg}")
);
let min_size_update = std::cmp::max(file_size / 100, 1);
// Step 0:
// Prepare the reader
let mut fr: BufReader<Box<dyn ReadSeek>> = if memory {
progressbar.set_message("Step 0/2. Reading file into memory");
progressbar.set_position(0);
let mut buf: Vec<u8> = vec![0; file_size as usize];
let mut f = File::open(input_file)?;
let mut pos: usize = 0;
let mut pos_update: usize = min_size_update as usize;
{ // from read_exact
let mut buf: &mut [u8] = buf.as_mut_slice();
while !buf.is_empty() {
match f.read(buf) {
Ok(0) => break,
Ok(n) => {
pos += n;
if pos >= pos_update {
progressbar.set_position(pos as u64);
pos_update = pos + pos % (min_size_update as usize);
}
let tmp = buf;
buf = &mut tmp[n..];
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => return Err(Error::from(e)),
}
}
if !buf.is_empty() {
Err(std::io::Error::new(ErrorKind::UnexpectedEof, "failed to fill whole buffer"))
} else {
Ok(())
}
}?;
progressbar.finish_and_clear();
BufReader::new(Box::new(Cursor::new(buf)))
} else {
BufReader::new(Box::new(File::open(input_file)?))
};
progressbar.reset();
// Step 1:
// Traverse the file, locating the start position of each line in the file
// and determining the number of columns
// positions of start of each line
let mut cursors: Vec<usize> = Vec::new();
// number of columns
let mut cc = 0;
let mut pos: usize = 0;
{
progressbar.set_message("Step 1/2. Locating starts of each row");
progressbar.set_position(0);
let mut pos_update: usize = min_size_update as usize;
cc += 1;
cursors.push(0);
loop {
if pos >= pos_update {
progressbar.set_position(pos as u64);
pos_update = pos + pos % (min_size_update as usize);
}
let (done, used) = {
let bytes = match fr.fill_buf() {
Ok(bytes) => Ok(bytes),
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => Err(e),
}?;
if !bytes.is_empty() {
let ret: Result<(bool, usize), Error> = match memchr(row_delimiter, &bytes) {
None => {
if cursors.len() == 1 {
cc += bytecount::count(&bytes, column_delimiter);
}
Ok((false, bytes.len()))
},
Some(ind) => {
if cursors.len() == 1 {
cc += bytecount::count(&bytes[..ind], column_delimiter);
}
cursors.push(pos+ind+1);
Ok((false, ind+1))
},
};
ret
} else {
Ok((true, 0))
}
}?;
pos += used;
fr.consume(used);
if done {
break
}
}
progressbar.finish_and_clear();
}
progressbar.reset();
// Step 2:
// Traverse file vertically writing to output file
let mut fr = if memory {
fr.seek_relative(-(pos as i64))?;
fr
} else {
let record_size = match record_size {
Some(s) => s,
None => {
let delimiter_size = cursors.len() * cc - 1; // number of rows (newlines) * columns - last newline (underestimate)
let data_size = (file_size as usize) - delimiter_size; // data is whatever isn't a delimiter
let estimate = (2*data_size) / delimiter_size; // 2* average is hopefully a decent guess for record size;
progressbar.println(format!("Est. record size: {}", estimate));
estimate
},
};
let ret: BufReader<std::boxed::Box<dyn ReadSeek>> = BufReader::with_capacity(record_size, Box::new(File::open(input_file)?));
ret
};
pos = 0;
{
progressbar.set_message("Step 2/2. Transposing");
progressbar.set_position(0);
progressbar.println(format!("({}, {}) => ({}, {})", cursors.len(), cc, cc, cursors.len()));
let mut fw: Box<dyn Write> = if output_file == "-" {
Box::new(BufWriter::new(stdout()))
} else {
Box::new(BufWriter::new(File::create(output_file)?))
};
let mut excess: Vec<u8> = Vec::new();
let mut output_file_size: usize = 0;
let mut output_file_size_update: usize = min_size_update as usize;
for row in (0..cc).into_iter() {
if row != 0 {
fw.write(&[row_delimiter])?;
output_file_size += 1;
}
for (col, cur) in cursors.iter_mut().enumerate() {
if output_file_size >= output_file_size_update {
progressbar.set_position(output_file_size as u64);
output_file_size_update = output_file_size + output_file_size % (min_size_update as usize);
}
let d_pos = (*cur as i64) - (pos as i64);
fr.seek_relative(d_pos)?;
pos = *cur;
excess.clear();
'inner: loop {
let (done, used) = {
let bytes = match fr.fill_buf() {
Ok(bytes) => Ok(bytes),
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => Err(e),
}?;
if !bytes.is_empty() {
let ret: Result<(bool, usize), Error> = match memchr2(column_delimiter, row_delimiter, &bytes) {
Some(ind) => {
if col != 0 {
fw.write(&[column_delimiter])?;
output_file_size += 1;
}
if excess.is_empty() {
fw.write(&bytes[..ind])?;
output_file_size += ind;
*cur += ind+1;
} else {
excess.extend_from_slice(&bytes[..ind]);
fw.write(&excess.as_slice())?;
output_file_size += excess.len();
*cur += excess.len()+1;
excess.clear();
}
Ok((true, ind+1))
},
None => {
// We reached the end of the buffer without a col/row remaining
// either EOF or end of buffer
excess.extend_from_slice(bytes);
Ok((false, bytes.len()))
},
};
ret
} else {
if !excess.is_empty() {
if col != 0 {
fw.write(&[column_delimiter])?;
output_file_size += 1;
}
fw.write(&excess.as_slice())?;
output_file_size += excess.len();
}
Ok((true, 0))
}
}?;
pos += used;
fr.consume(used);
if done {
break 'inner;
}
}
}
}
progressbar.finish_and_clear();
}
Ok(())
}
// Utility functions
fn one_and_only<T>(it: &mut impl Iterator<Item = T>) -> Result<T, &str> {
if let Some(first) = it.next() {
if let Some(_) = it.next() {
Err("Expected one, got many")
} else {
Ok(first)
}
} else {
Err("Expected one, got none")
}
}
// Error handlers
#[derive(Debug)]
struct Error(std::io::Error);
impl From<String> for Error {
fn from(error: String) -> Self {
Error(std::io::Error::new(std::io::ErrorKind::Other, error))
}
}
impl From<&str> for Error {
fn from(error: &str) -> Self {
Error(std::io::Error::new(std::io::ErrorKind::Other, String::from(error)))
}
}
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Error(error)
}
}
impl From<std::num::TryFromIntError> for Error {
fn from(_error: std::num::TryFromIntError) -> Self {
Error::from("Failed to convert integer, out of bounds")
}
}
impl From<std::num::ParseIntError> for Error {
fn from(_error: std::num::ParseIntError) -> Self {
Error::from("Failed to convert string to integer")
}
}
impl From<std::convert::Infallible> for Error {
fn from(_error: std::convert::Infallible) -> Self {
Error::from("Failed to convert integer, out of bounds")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment