-
-
Save ShanSabri/b1bdf0951efa0dfee0edeb5509f87e88 to your computer and use it in GitHub Desktop.
Parallelize RDS compression/decompression to improve serialization performance in R
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The functions below use parallelized versions of gzip, xz, and bzip2 to | |
# improve compression/decompression performance of RDS serialization in R. | |
# Each function searches for the appropriate program (based on the required | |
# compression format) and if found, offloads the compression handling to the | |
# external program and therefore leaves R free to do the data import/export. | |
# The two main functions (saveRDS and readRDS) mask R's native read and write | |
# functions. The functions have been only tested on macOS, but they must work | |
# on any Linux/Unix. | |
# | |
# Requires the following packages: pxz, pbzip2, and pigz. | |
# | |
# Run the following line at the command prompt before using the functions. | |
# | |
# brew install pigz pbzip2 pigz | |
# | |
library(parallel) | |
saveRDS.xz <- | |
function(object, file, threads = parallel::detectCores()) { | |
pxzAvail <- any(grepl("(XZ Utils)", system("pxz -V", intern = TRUE))) | |
if (pxzAvail) { | |
con <- pipe(paste0("pxz -T", threads, " > ", file), "wb") | |
base::saveRDS(object, file = con) | |
close(con) | |
} else { | |
base::saveRDS(object, file = file, compress = "xz") | |
} | |
} | |
readRDS.xz <- function(file, threads = parallel::detectCores()) { | |
pxzAvail <- any(grepl("(XZ Utils)", system("pxz -V", intern = TRUE))) | |
if (pxzAvail) { | |
con <- pipe(paste0("pxz -d -k -c -T", threads, " ", file)) | |
object <- base::readRDS(file = con) | |
close(con) | |
} else { | |
object <- base::readRDS(file) | |
} | |
return(object) | |
} | |
saveRDS.gz <- | |
function(object, | |
file, | |
threads = parallel::detectCores(), | |
compression_level = 6) { | |
pigzAvail <- any(grepl("pigz", system("pigz -V 2>&1", intern = TRUE))) | |
if (pigzAvail) { | |
con <- | |
pipe(paste0("pigz -c", compression_level, " -p", threads, " > ", file), | |
"wb") | |
base::saveRDS(object, file = con) | |
close(con) | |
} else { | |
base::saveRDS(object, file = file, compress = "gzip") | |
} | |
} | |
readRDS.gz <- function(file, threads = parallel::detectCores()) { | |
pigzAvail <- any(grepl("pigz", system("pigz -V 2>&1", intern = TRUE))) | |
if (pigzAvail) { | |
con <- pipe(paste0("pigz -d -c -p", threads, " ", file)) | |
object <- base::readRDS(file = con) | |
close(con) | |
} else { | |
object <- base::readRDS(file) | |
} | |
return(object) | |
} | |
saveRDS.bz2 <- | |
function(object, | |
file, | |
threads = parallel::detectCores(), | |
compression_level = 6) { | |
pbz2Avail <- | |
any(grepl("Parallel BZIP2", system("pbzip2 -V 2>&1", intern = TRUE))) | |
if (pbz2Avail) { | |
con <- | |
pipe(paste0("pbzip2 -c", compression_level, " -p", threads, " > ", file), | |
"wb") | |
base::saveRDS(object, file = con) | |
close(con) | |
} else { | |
base::saveRDS(object, file = file, compress = "bzip2") | |
} | |
} | |
readRDS.bz2 <- function(file, threads = parallel::detectCores()) { | |
pbz2Avail <- | |
any(grepl("Parallel BZIP2", system("pbzip2 -V 2>&1", intern = TRUE))) | |
if (pbz2Avail) { | |
con <- pipe(paste0("pbzip2 -d -c -p", threads, " ", file)) | |
object <- base::readRDS(file = con) | |
close(con) | |
} else { | |
object <- base::readRDS(file) | |
} | |
return(object) | |
} | |
readRDS <- function(file, threads = parallel::detectCores()) { | |
if (!file.exists(file)) { | |
stop(paste0(file, " does not exist!")) | |
} | |
fileDetails <- system2("file", args = file, stdout = TRUE) | |
selector <- | |
sapply(c("gzip", "XZ", "BZ"), function (x) { | |
grepl(x, fileDetails) | |
}) | |
format <- names(selector)[selector] | |
if (length(format) == 0) { | |
format <- "not found" | |
} | |
if (format == "gzip") { | |
object <- readRDS.gz(file, threads = threads) | |
} else if (format == "XZ") { | |
object <- readRDS.xz(file, threads = threads) | |
} else if (format == "bzip2") { | |
object <- readRDS.bz2(file, threads = threads) | |
} else { | |
object <- base::readRDS(file) | |
} | |
return(object) | |
} | |
saveRDS <- function(object, | |
file = "", | |
compress = TRUE) { | |
if (compress %in% c(TRUE, "gz", "gzip")) { | |
saveRDS.gz(object, file) | |
} else if (compress %in% c("bzip", "bzip2", "bz", "bz2")) { | |
saveRDS.bz2(object, file) | |
} else if (compress %in% c("xz", "7zip", "7z")) { | |
saveRDS.xz(object, file) | |
} else if (compress == FALSE) { | |
base::saveRDS(object, file) | |
} else { | |
stop(paste0(compress, " is not a recognized compression method!")) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment