|
# csv_process ---- |
|
|
|
# Description ---- |
|
# |
|
# A wrapping function to read a CSV file in chunks, process them |
|
# with a user specified function and write them back to a CSV. |
|
# |
|
# Usage ---- |
|
# |
|
# csv_process(inputFile = 'iris.txt' |
|
# , outputFile = 'iris_output.txt' |
|
# , inputSep = ':' |
|
# , outputSep = ',' |
|
# , fun = user_specified_function |
|
# , chunkSize = 100 |
|
# , isChunkStrict = FALSE |
|
# , quoteChar = "\"" |
|
# , colClasses = NA |
|
# , rowThres = 1000 |
|
# , max_cores = 0 |
|
# , ...) |
|
# |
|
# Arguments ---- |
|
# |
|
# inputFile: path to input file |
|
# outputFile: complete path to output file |
|
# inputSep: input file separator |
|
# default is ',' |
|
# outputSep: output file separator |
|
# default is ',' |
|
# fun: user specified function which takes an argument |
|
# named either 'chunk_list' or 'chunk_df' as an input |
|
# chunkSize: number of lines to be read in a chunk |
|
# default is 100 |
|
# isChunkStrict: boolean indicating whether chunkSize should |
|
# divide number of lines(rows) of the inputFile. |
|
# default is FALSE |
|
# rowThres: Number of lines beyond which csv_process |
|
# function goes into parallel mode |
|
# default is 999 |
|
# quoteChar: quote characters with proper escaping in 'chunk_df' |
|
# case |
|
# default is "\"" |
|
# colClasses: column classes in 'chunk_df' case. |
|
# default is NA |
|
# max_cores: maximum number of cores to use in parallel mode. |
|
# default is 0, indicating to use all cores |
|
# ...: arguments to be sent as is to 'fun' |
|
# |
|
# Imports/Depends ---- |
|
# |
|
# Packages `assertthat`, `magrittr`, `parallel`, `iterators`, `itertools`, |
|
# `stringr` |
|
# The function works on POSIX(unix like) systems ONLY |
|
# system functions used: `wc`, `split`, `cat` |
|
# |
|
# Details ---- |
|
# |
|
# The purpose of the function is to provide a framework to read |
|
# CSV file in chunks or line-by-line, apply a |
|
# custom(user-specified) function to the chunk and write it onto |
|
# a new CSV file. This reduces memory overhead significantly. |
|
# |
|
# The function is built to use multiple cores. The files are |
|
# split and fed to multiple cores to do the processing. A |
|
# temporary directory of split inputs and outputs are created in |
|
# the base path of outputFile and later destroyed. This is done |
|
# with system commands like `split`. Make sure that directory is |
|
# writable. Set max_cores to 1, to avoid parallel processing. |
|
# |
|
# fun(user-specified) should take one of the arguments: |
|
# either chunk_df: a dataframe |
|
# or chunk_list: a list |
|
# |
|
# In the case of chunk_df: |
|
# fun(user-specified) should always read a dataframe named chunk_df |
|
# with other optional arguments which are passed as is from ... |
|
# fun should always return a dataframe. These dataframes are |
|
# appended to the outputfile. |
|
# |
|
# In the case of chunk_list: |
|
# fun(user-specified) should always read a list named chunk_list |
|
# with other optional arguments which are passed as is from ... |
|
# fun should always return a list. Each element of this list is |
|
# coerced into a character vector, concatenated by output |
|
# separator and converted to a string. Then, each string written |
|
# to the output CSV file. |
|
# |
|
# Example ---- |
|
# |
|
# Please see the corresponding vignette. |
|
# |
|
# Author ---- |
|
# Srikanth KS (gmail at sri.teach) |
|
# |
|
# function csv_process ---- |
|
csv_process <- function(inputFile |
|
, outputFile |
|
, inputSep = ',' |
|
, outputSep = ',' |
|
, fun |
|
, chunkSize = 100 |
|
, isChunkStrict = FALSE |
|
, quoteChar = "\"" |
|
, colClasses = NA |
|
, rowThres = 999 |
|
, max_cores = 0 |
|
, ...){ |
|
# check and load packages ---- |
|
stopifnot(require('assertthat' |
|
, quietly = TRUE |
|
, warn.conflicts = FALSE)) |
|
assert_that(require('magrittr' |
|
, quietly = TRUE |
|
, warn.conflicts = FALSE)) |
|
assert_that(require('iterators' |
|
, quietly = TRUE |
|
, warn.conflicts = FALSE)) |
|
assert_that(require('itertools' |
|
, quietly = TRUE |
|
, warn.conflicts = FALSE)) |
|
assert_that(require('stringr' |
|
, quietly = TRUE |
|
, warn.conflicts = FALSE)) |
|
# assertions ---- |
|
assert_that(is.string(inputFile)) |
|
assert_that(file.exists(inputFile)) |
|
inputFile <- normalizePath(inputFile) |
|
assert_that(is.string(outputFile)) |
|
assert_that(!file.exists(outputFile)) |
|
assert_that(is.string(inputSep)) |
|
assert_that(is.string(outputSep)) |
|
assert_that(is.function(fun)) |
|
funAtt <- attributes(formals(fun))[["names"]] |
|
booleans <- c("chunk_df", "chunk_list") %in% funAtt |
|
assert_that(length(which(booleans == TRUE)) == 1) |
|
funType <- c("chunk_df", "chunk_list")[which(booleans == TRUE)] |
|
assert_that(is.count(chunkSize)) |
|
assert_that(is.flag(isChunkStrict)) |
|
assert_that(is.count(rowThres) && rowThres > chunkSize) |
|
assert_that(is.count(max_cores) || max_cores == 0) |
|
# argument list to be passed for 'fun' ---- |
|
argListForFun <- list(...) |
|
# utility: how many ---- |
|
how_many <- function(path |
|
, toCount = "lines"){ |
|
switch(toCount |
|
, lines = assign("tc", "-l") |
|
, words = assign("tc", "-w") |
|
, characters = assign("tc", "-m")) |
|
|
|
wcString <- system(paste0("wc ", tc, " ", path) |
|
, intern = TRUE) |
|
number_tentative <- strsplit(wcString, " ")[[1]][1] |
|
if (number_tentative != "wc:") { |
|
return(as.integer(number_tentative)) |
|
} else { |
|
stop(wcString) |
|
} |
|
} |
|
# decide about parallel ---- |
|
nOfRows <- how_many(inputFile, "lines") |
|
|
|
# check if chunkSize is not greater than nOfRows |
|
if (chunkSize > nOfRows) { |
|
stop("chunkSize is greater than number of rows") |
|
} |
|
|
|
# check if chunking can be done, strictly |
|
if (isChunkStrict && !is.count(nOfRows/chunkSize)) { |
|
stop("Number of rows is not divisible by chunkSize") |
|
} |
|
|
|
# use one core if there are not too many rows |
|
if (nOfRows < rowThres) { |
|
max_cores <- 1 |
|
} |
|
nCores <- ifelse(max_cores == 0 |
|
, parallel::detectCores() |
|
, min(parallel::detectCores(), max_cores) |
|
) |
|
# function to work on single core ---- |
|
funWrapper <- function(user_fun |
|
, input_file |
|
, output_file |
|
, input_sep |
|
, output_sep |
|
, chunk_size |
|
, quote_char |
|
, col_classes |
|
, arg_list |
|
, fun_type){ |
|
userFun <- match.fun(user_fun) |
|
if (fun_type == "chunk_df") { |
|
# count lines and set up an skip iterator |
|
n_lines <- how_many(input_file, "lines") |
|
skipLines <- seq(from = 1 |
|
, to = n_lines |
|
, by = chunk_size) - 1 |
|
skipLines <- ihasNext(iter(skipLines)) |
|
|
|
# loop to create a DF chunk and process |
|
while (hasNext(skipLines)) { |
|
dfChunk <- read.table(input_file |
|
, sep = input_sep |
|
, quote = quote_char |
|
, colClasses = col_classes |
|
, skip = nextElem(skipLines) |
|
, nrows = chunk_size) |
|
arg_list[["chunk_df"]] <- dfChunk |
|
write.table(do.call(userFun, arg_list) |
|
, file = output_file |
|
, sep = output_sep |
|
, append = TRUE |
|
, col.names = FALSE |
|
, row.names = FALSE |
|
, quote = FALSE) |
|
} |
|
} |
|
if (fun_type == "chunk_list") { |
|
# setup connections |
|
inputCon <- file(input_file, 'r') |
|
outputCon <- file(output_file, 'w') |
|
# close connections on exit |
|
on.exit(close(inputCon), add = TRUE) |
|
on.exit(close(outputCon), add = TRUE) |
|
|
|
# loop to write(append) output |
|
while (TRUE) { |
|
linesRead <- readLines(inputCon, n = chunk_size) |
|
if (length(linesRead) == 0) { break } |
|
linesList <- strsplit(linesRead, split = input_sep) |
|
rm(linesRead) |
|
|
|
arg_list[['chunk_list']] <- linesList |
|
do.call(userFun, arg_list) %>% |
|
vapply(function(x){ |
|
do.call(paste |
|
,c(x, sep = output_sep) %>% as.list) |
|
} |
|
, FUN.VALUE = character(1)) %>% |
|
writeLines(con = outputCon) |
|
} |
|
} |
|
} |
|
# parallel and Non parallel case ---- |
|
if (nCores == 1) { |
|
funWrapper(user_fun = fun |
|
, input_file = inputFile |
|
, output_file = outputFile |
|
, input_sep = inputSep |
|
, output_sep = outputSep |
|
, chunk_size = chunkSize |
|
, quote_char = quoteChar |
|
, col_classes = colClasses |
|
, arg_list = argListForFun |
|
, fun_type = funType) |
|
return(invisible(TRUE)) |
|
} else { |
|
# parallel case |
|
suffixLength <- ceiling(log10(nCores) + 1) |
|
|
|
# split the file into approx nCores number of pieces |
|
tentative <- ceiling(nOfRows/nCores) |
|
toadd <- chunkSize - (tentative %% chunkSize) |
|
nOfLinesPerSplit <- tentative + toadd |
|
|
|
# create path for tempDir |
|
if (!dir.exists(dirname(outputFile))) { |
|
dir.create(dirname(outputFile)) |
|
} |
|
path <- file.path(dirname(outputFile) |
|
, paste0('tempDir_' |
|
, gsub(" ","_", Sys.time()) %>% |
|
gsub(":","_",.)) |
|
) |
|
dir.create(path, recursive = TRUE) |
|
dir.create(file.path(path, 'outputs')) |
|
on.exit(unlink(path, recursive = TRUE), add = TRUE) |
|
|
|
# split inputFile into pieces |
|
splitCommand <- paste("cd " |
|
, path |
|
, " ; " |
|
, "split " |
|
, "-a " |
|
, suffixLength %>% as.character |
|
, " -l " |
|
, nOfLinesPerSplit |
|
, " -d " |
|
, inputFile) |
|
system(splitCommand, wait = TRUE) |
|
|
|
# padfun |
|
padfun <- function(number, sl = suffixLength){ |
|
stringr::str_pad(as.character(number) |
|
, width = sl |
|
, side = 'left' |
|
, pad = '0') |
|
} |
|
|
|
# passer |
|
passer <- function(z){ |
|
# define local varibles to passed to funWrapper |
|
padfunVal <- padfun(z) |
|
inputFile_l <- paste0('x', padfunVal) %>% |
|
file.path(path, .) |
|
outputFile_l <- paste0('xo', padfunVal) %>% |
|
file.path(path, 'outputs', .) |
|
|
|
# invoke funWrapper |
|
funWrapper(user_fun = fun |
|
, input_file = inputFile_l |
|
, output_file = outputFile_l |
|
, input_sep = inputSep |
|
, output_sep = outputSep |
|
, chunk_size = chunkSize |
|
, quote_char = quoteChar |
|
, col_classes = colClasses |
|
, arg_list = argListForFun |
|
, fun_type = funType) |
|
return(TRUE) |
|
} |
|
|
|
# count includes 'outputs' folder too |
|
nOfFiles <- list.files(file.path(path)) %>% length |
|
|
|
# mute warnings |
|
oldWarn <- getOption('warn') |
|
options(warn = -1) |
|
on.exit(options(warn = oldWarn), add = TRUE) |
|
|
|
# note indexing starting at zero |
|
temp <- parallel::mclapply(0:(nOfFiles - 2) |
|
, passer |
|
, mc.cores = nCores |
|
) |
|
|
|
# combine the processed files and remove intermediates |
|
|
|
oFilesStrings <- list.files(file.path(path, 'outputs')) |
|
oFilesString <- do.call('paste', c(oFilesStrings |
|
, sep = ' ') %>% as.list) |
|
combineCommand <- paste("cd " |
|
, file.path(path, 'outputs') |
|
, " ; " |
|
, 'cat ' |
|
, oFilesString |
|
, ' > ' |
|
, outputFile) |
|
system(combineCommand, wait = TRUE) |
|
return(invisible(TRUE)) |
|
} |
|
} |