Skip to content

Instantly share code, notes, and snippets.

@talegari
Last active July 27, 2016 13:11
Show Gist options
  • Save talegari/808697ab90117571d5db to your computer and use it in GitHub Desktop.
Save talegari/808697ab90117571d5db to your computer and use it in GitHub Desktop.
[R] framework for processing large CSV files on disk with R's statistical functions optimized for low memory usage (unix based systems only)

unlimiteR

framework for processing large CSV files on disk with R's statistical functions optimized for low memory usage (unix based systems only)

  • These are a set of functions and functionals written in R to process CSV files on disk optimized to use low memory.

  • The memory usage limit is indirectly configurable by setting the chunkSize argument and understanding the heuristic of how much memory is utilized, at any point of time in the execution.

Caveat: If you need to do routine processes like changing separator, text processing etc, it is better to use a shell/bash script (many times faster and better memory usage). The idea with the utilities is to provide a convenient framework for R users to apply R functions on out-of-memory data sets with a handle over the memory usage.

Framework

csv_process

Function Description
csv_process (vignette) A function to read a CSV file in chunks, process them with a user specified function and write them back to a CSV

A typical workflow

csv_aggregate

Function Description
csv_aggregate A function to aggregate on CSV file where user specified function is applied on each split and is combined back into a CSV

A typical workflow

Utilities

Max Memory points to an heuristic of maximum memory usage at any point of time. It is the approximate number of times the size of the file chunk read in R (size of the readLines object).

Function Description Max Memory
csv_sampleR Sample a few lines from a large CSV into R as a dataframe NA
csv_sample Sample a few lines from a large CSV on disk onto a different CSV file NA
csv_change_separator2 Change the delimiter in a CSV file (Unix alike only), thin wrapper binding a sed command with input checks NA
csv_change_separator
(deprecated, but works)
Use csv_change_separator2 instead.
----
Write delimited files with separator more than one byte into a file with one byte separator
2
csv_select_columns
(deprecated, but works)
Write selected columns of a delimited file to a delimited file 3
csv_select_columns2 Write selected columns of a delimited file to a delimited file using unix command cut NA
csv_hasEqualCols To check whether a CSV has equal number of columns by ensuring that there are equal number of separators(delimiters) in each line ?
how_many
-- how_many_lines
-- how_many_words
-- how_many_characters
A thin wrapper around unix's wc to count lines, words and characters NA

Motivation

R is a profligate user of memory -- Hadley Wickham

  • Build a statistical ETL to handle datasets ~ 1 to 10 GB for Batch Processing.
  • Should work out of a single desk/laptop with around 2 GB RAM without breaking R session while keeping other programs running -- low memory usage.

--- Philosophy --- At batch processing stage, the limiting factor is memory, perhaps more than time or space.

Audience

  • Data analysts with a laptop at their disposal, unix(linux) system and R installed should find it handy.
  • Data Engineers can safely include in scripts that are run by cron job(s).

Dependencies

  • POSIX compliant operating system (UNIX alike)
  • Written and tested on R version 3.2.3.

Credits and License

Author: Srikanth KS (talegari) License: GNU General Public License v3 (GPL3)

Thanks

  • R core members and the open-source community.
  • Authors and maintainers of all R packages.
  • Unix, Linux ... its creators and open-source community.

# csv_aggregate
#
# Description ----
#
# a parallel utility to aggregate on CSV file where user specified 'map'
# function is applied on each split and is combined back into a CSV.
#
# Usage
#
# same <- function(df){df}
# write.table(iris
# , 'iris.csv'
# , col.names = FALSE
# , row.names = FALSE
# , quote = FALSE
# , sep = ",")
# csv_aggregate(inputFile = "iris.csv"
# , outputFile = "iris_res.csv"
# , inputSep = ","
# , outputSep = ","
# , groupBy = c('Species')
# , colTypes = list(a = 'real'
# , b = 'real'
# , c = 'real'
# , d = 'real'
# , Species = "TEXT")
# , skipLines = 0
# , fun = same
# , maxCores = 1)
#
# Arguments ----
#
# inputFile: path to input file CSV without quotes
# outputFile: full path where output file is created with the name
# inputSep: separator used in the input file (one byte)
# default is ','
# outputSep: separator used in the output file (one byte)
# default is ','
# groupBy: name of the column(s) to be grouped by before fun is applied
# colTypes: named list with sqlite columns types among 'INTEGER', 'REAL'
# and 'TEXT'. Names form the names of the columns.
# fun: name of the function to be applied to each split.
# should have a mandatory argument called 'df' to read a
# dataframe and should output a dataframe.
# skipLines: number of lines to skip while reading a CSV file into sqlite
# maxCores: number of cores to use. To use all cores, use -1(default).
# ...: arguments to be sent as is to 'fun'
#
# Imports/Depends ----
#
# Packages `assertthat`, `magrittr`, `parallel`, `iterators`, `RSQLite`,
# `foreach`
# The function works on POSIX(unix like) systems ONLY
# system functions used: `wc`, `split`, `cat`
#
# Details ----
#
# The function works as a near drop-in replacement for R's `tapply` or
# `aggregate`. The CSV is read into rsqlite database's table. Unique
# values of the groupBy columns are stored into a CSV. Then, concurrent
# processes are run to import subsets of data into R and apply
# user-specified function. All intermediate data is deleted after the
# function run.
#
# Example ----
#
# Please see the corresponding vignette.
#
# Author ----
#
# Srikanth KS (gmail at sri.teach)
#
# function csv_aggregate ----
csv_aggregate <- function(inputFile
, outputFile
, inputSep = ","
, outputSep = ","
, groupBy
, colTypes
, fun
, skipLines = 1
, maxCores = -1
, ...){
# load libraries ----
stopifnot(require("assertthat"
, quietly = TRUE
, warn.conflicts = FALSE
, character.only = TRUE))
loadPackage <- function(pkgName){
invisible(
assert_that(
suppressPackageStartupMessages(
require(pkgName
, quietly = TRUE
, warn.conflicts = FALSE
, character.only = TRUE
)
)
)
)
}
loadPackage("magrittr")
loadPackage("iterators")
loadPackage("foreach")
loadPackage("parallel")
loadPackage("RSQLite")
# assertions ----
assert_that(file_test("-f", inputFile))
inputFile <- normalizePath(inputFile)
assert_that(!file.exists(outputFile))
assert_that(is.string(inputSep) && nchar(inputSep) == 1)
assert_that(is.string(outputSep))
assert_that(is.atomic(groupBy) &&
length(groupBy) >= 1 &&
class(groupBy) == "character"
)
assert_that(is.list(colTypes) &&
length(colTypes) > 1 &&
!is.null(names(colTypes))
)
assert_that(all(colTypes %in% c('INTEGER', 'REAL', 'TEXT')))
assert_that(all(groupBy %in% names(colTypes)))
assert_that(any(names(colTypes) %in% groupBy == FALSE))
assert_that(is.function(fun))
assert_that(is.count(skipLines) || skipLines == 0)
assert_that(is.count(maxCores) || maxCores == -1)
# argument list to be passed for 'fun' ----
argListForFun <- list(...)
# helper functions ----
how_many_lines <- function(path){
wcString <- system(paste0("wc ", "-l", " ", path)
, intern = TRUE)
number_tentative <- strsplit(wcString, " ")[[1]][1]
if (number_tentative != "wc:") {
return(as.integer(number_tentative))
} else {
stop(wcString)
}
}
# setup db connection ----
timestamp <- gsub(" ","_", Sys.time()) %>% gsub(":","_",.)
tempDir <- file.path(dirname(outputFile)
, paste0("csvaggregate_", timestamp))
dir.create(tempDir, recursive = TRUE)
dbPath <- file.path(tempDir, "dbfromcsv.sqlite")
db <- dbConnect(SQLite(), dbname = dbPath)
on.exit(if (dbIsValid(db)) {dbDisconnect(db)}, add = TRUE)
on.exit(unlink(tempDir, recursive = TRUE), add = TRUE)
# read the CSV into csvTable ----
dbWriteTable(db
, name = "csvTable"
, value = inputFile
, field.types = colTypes
, header = FALSE
, skip = skipLines
, sep = inputSep
, row.names = FALSE)
# write the results onto distincts.CSV ----
dir.create(file.path(tempDir, "d"))
distinctsPath <- file.path(tempDir, "d","distincts.csv")
file.create(distinctsPath)
queryText_distincts <-
paste0("SELECT DISTINCT "
, do.call(paste
, as.list(c(groupBy, sep = ",")))
, " FROM csvTable")
result_distinct <- dbSendQuery(db, queryText_distincts)
rm(queryText_distincts)
while (!dbHasCompleted(result_distinct)) {
fetch(result_distinct, n = 500) %>%
write.table(file = distinctsPath
, sep = inputSep
, row.names = FALSE
, col.names = FALSE
, quote = FALSE
, append = TRUE)
}
dbClearResult(result_distinct)
dbDisconnect(db)
# split distincts.CSV into pieces ----
if (maxCores != -1) {
nCores <- min(maxCores, parallel::detectCores())
} else {
nCores <- detectCores()
}
nLines <- how_many_lines(distinctsPath)
nOfLinesPerSplit <- min(nLines, ceiling(nLines/nCores))
splitCommand <- paste("cd "
, file.path(tempDir, "d")
, " ; "
, "split "
, "-a "
, "3"
, " -l "
, nOfLinesPerSplit
, " -d "
, distinctsPath)
system(splitCommand, wait = TRUE)
unlink(distinctsPath)
dir.create(file.path(tempDir, "output"))
# create multiple copies of the DB ----
splitFileNames <- list.files(file.path(tempDir, "d"))
dbnamesSplit <- strsplit(splitFileNames, split = "")
dbnames <- vapply(dbnamesSplit
, function(x){
tail(x, n = -1) %>%
paste(collapse = "") %>%
paste("dbfromcsv_"
, .
, ".sqlite"
, sep = "")
}
, character(1))
temp <- lapply(1:length(splitFileNames)
, function(x){
system(
paste0("cp "
, file.path(tempDir, "dbfromcsv.sqlite")
, " "
, file.path(tempDir, dbnames[x]))
, wait = TRUE)
}
)
rm(temp)
# function to run in a parallel loop ----
mapred <- function(filePath){
iro <- ireadLines(filePath)
foreach(al = iro) %do% {
fieldValues <- strsplit(al, inputSep)[[1]]
groupByEquals <-
vapply(seq_along(groupBy)
, function(x){
paste0(groupBy[[x]]
, " = "
, if (colTypes[groupBy[[x]]] != "TEXT") {
fieldValues[[x]]
} else {
paste0("'", fieldValues[[x]], "'")
}
)
}
, character(1)
)
queryText <- paste0("SELECT * FROM csvTable WHERE "
, c(groupByEquals, sep = " AND ") %>%
as.list %>%
do.call(paste,.)
)
rm(groupByEquals)
# file identifier
fi <- strsplit(basename(filePath), split = "")[[1]] %>%
tail(n = -1) %>%
paste(collapse = "")
localdb <- paste("dbfromcsv_", fi, ".sqlite"
, collapse = ""
, sep = "")
db <- dbConnect(SQLite(), dbname = file.path(tempDir, localdb))
argListForFun[["df"]] <- dbGetQuery(db, queryText)
dbDisconnect(db)
outputDf <- do.call(fun, argListForFun)
rowLength <- nrow(outputDf)
outputFileBasePath <- paste("o",fi, sep = "")
lapply(fieldValues, function(x){rep(x,rowLength)}) %>%
data.frame(outputDf) %>%
write.table(file.path(tempDir, "output", outputFileBasePath)
, row.names = FALSE
, col.names = FALSE
, quote = FALSE
, sep = outputSep
, append = TRUE)
}
return(invisible(TRUE))
}
# run parallel loop ----
disPath <- file.path(tempDir, "d")
distincts <- vapply(list.files(disPath)
, function(x){file.path(disPath, x)}
, character(1))
temp <- mclapply(distincts, mapred, mc.cores = nCores)
# collect results ----
outputsPath <- file.path(tempDir, "output")
fullOutputsPaths <- vapply(list.files(outputsPath)
, function(x){file.path(outputsPath, x)}
, character(1)) %>% paste(collapse = " ")
catCommand <- paste0("cd "
, dirname(outputFile)
, " ; "
, "cat "
, fullOutputsPaths
, " > "
, outputFile)
system(catCommand, wait = TRUE)
return(invisible(TRUE))
}
# csv_cbind
#
# Description ----
#
# concatenate files horizontally
#
# Arguments ----
#
# files : character vector of input files
# outputFile : path to the output file
# separator : a string of length 1
#
# Value ----
#
# returns an invisible TRUE if `paste` does not encounter an error.
# else the function halts with an error
#
# Imports/Depends ----
#
# Packages `assertthat`
# The function works on POSIX(unix like) systems ONLY
# system functions used: `paste`
#
# Details ----
#
# does not perform the check whether the files have same number of rows
# The error from `paste` is not shielded
#
# Author ----
# Srikanth KS (gmail at sri.teach)
#
csv_cbind <- function(files
, outputFile
, separator = ","){
# argument check
stopifnot(require(assertthat
, quietly = TRUE
, warn.conflicts = FALSE
)
)
assert_that(all(vapply(files
, function(afile) { file_test("-f", afile) }
, logical(1)
)
) == TRUE
)
files <- normalizePath(files)
assert_that(!file.exists(outputFile))
assert_that(is.writeable(dirname(outputFile)))
assert_that(is.string(separator) && nchar(separator) == 1)
files_spaced <- gsub(", ", " ", toString(files))
# paste command
command <- paste0("paste "
, "-d "
, "'"
, separator
, "'"
, " "
, files_spaced
, " > "
, outputFile
)
system(command, intern = TRUE)
return(invisible(TRUE))
}
# csv_change_separator.R
# utility to write delimited files with separator
# more than one byte into a file with one byte separator.
# This was not written to be fast, but to use low memory
# without breaking the R session. chunkSize is the number of rows
# read/written at once. The thumb rule is that at any point
# of time during the execution, we use twice the memory of the
# readLines object of those many rows.
# written for REPL/interactive use, if you need a batch processor # better use awk or perl.
csv_change_separator <- function(inputFile
, outputFile
, inputSep
, outputSep = ','
, chunkSize = 10
, verbose = TRUE){
stopifnot(require('magrittr', warn.conflicts = FALSE))
start_time <- Sys.time()
# check overwriting a file
stopifnot( !file.exists(outputFile))
# check if input file exists
stopifnot( file.exists(inputFile))
# setup connections
inputCon <- file(inputFile, 'r')
outputCon <- file(outputFile, 'w')
# loop to write(append) output
while (TRUE) {
lines <- readLines(inputCon, n = chunkSize)
if (length(lines) == 0) { break }
lines %>%
gsub(inputSep,outputSep, ., perl = TRUE) %>%
writeLines(con = outputCon)
}
# closing connections
close(inputCon)
close(outputCon)
# verbose output
if (verbose == TRUE) {
cat('\n----\n')
cat('Processed'
, inputFile
, 'with'
, inputSep
, 'as separator and generated'
, outputFile
, 'with'
, outputSep
, 'as separator.')
cat('\n----\n')
end_time <- Sys.time()
cat('start time:\n')
print(start_time)
cat('end time:\n')
print(end_time)
cat('\n')
print(end_time - start_time)
cat('----')
}
}
# csv_change_separator2 ----
#
# Description ----
#
# change the delimiter in a CSV file (Unix alike only)
# thin wrapper binding a `sed` command with input checks.
#
# Usage ----
#
# csv_change_separator2(inputFile = "/ratings.dat"
# , outputFile = "/ratings_ouput.csv"
# , inputSep = "::"
# , outputSep = ",")
#
# Arguments ----
#
# inputFile: path to input file
# outputFile: full path of the output file
# inputSep: separator used in the input file
# outputSep: separator to be used in the output file
# defaults to ','
#
# Imports/Depends ----
# uses system command: `sed`
# package 'assertthat'
#
# Author ----
# Srikanth KS (gmail at sri.teach)
#
# function csv_change_separator2 ----
#
csv_change_separator2 <- function(inputFile
, outputFile
, inputSep
, outputSep = ","){
# argument check
stopifnot(require(assertthat
, quietly = TRUE
, warn.conflicts = FALSE))
file_test("-f", inputFile)
inputFile <- normalizePath(inputFile)
assert_that(!file.exists(outputFile))
assert_that(is.writeable(dirname(outputFile)))
assert_that(is.string(inputSep))
assert_that(is.string(outputSep))
# sed command
# escape forward slash
if (inputSep == "/") {
inputSep <- "\/"
}
sedCommand <- paste0("sed s/"
, inputSep
, "/"
, outputSep
, "/g "
, inputFile
, " > "
, outputFile)
sedMsg <- system(sedCommand, intern = TRUE)
if (identical(nchar(sedMsg), integer(0))) {
return(invisible(TRUE))
} else {
# cleanup if required
if (file.exists(outputFile)) {unlink(outputFile)}
stop(sedMsg)
}
}
# csv_hasEqualCols ----
# Description ----
# To check whether a CSV has equal number of columns by
# ensuring that there are equal number of separators(delimiters)
# in each line.
#
# Usage ----
# csv_hasEqualCols('iris.csv'
# , sep = ","
# , skip = 0
# , ignoreEmptyLines = TRUE
# , allowEmpty = FALSE
# , trimWs = TRUE)
#
# Arguments ----
# path: path to CSV file
# sep: the delimiter(space is not a valid delimiter)
# default is ","
# skip: nmmber of lines to skip
# default is 0
# ignoreEmptyLines: whether empty lines are to be ignored
# default is TRUE
# allowEmpty: whether the data on a line between two
# separators can be empty
# default is FALSE
# trimWs: whether the data on a line between two
# separators to be trimmed of white spaces on
# both sides
# default is TRUE
#
# Imports ----
# Packages `assertthat` and `magrittr`
#
# Details ----
# Function always returns a boolean.
# Messages indicate the kind of problem with the chunk number.
# By knowing the chunk size and skip, approximate location of
# the problem can be found.
#
# Author ----
# Srikanth KS (gmail at sri.teach)
# function csv_hasEqualCols ----
csv_hasEqualCols <- function(path
, sep = ","
, skip = 0
, chunkSize = 1000
, ignoreEmptyLines = TRUE
, allowEmpty = FALSE
, trimWs = TRUE){
# validitiy checks ----
stopifnot(require("assertthat"))
assert_that(require("magrittr"))
assert_that(is.string(path))
assert_that(file.exists(path))
assert_that(is.string(sep))
assert_that(skip == 0 || is.count(skip))
assert_that(is.count(chunkSize))
assert_that(is.flag(ignoreEmptyLines))
assert_that(is.flag(allowEmpty))
assert_that(is.flag(trimWs))
#
# message collection ----
differMsg <-
"(csv_hasEqualCols)number of columns differ at chunk "
emptyStringMsg <-
"(csv_hasEqualCols)Empty string encountered at chunk "
emptyLineMsg <-
"(csv_hasEqualCols)Empty line encountered at chunk "
#
# fun to give the distinct col nums in a character vector ----
getLength <- function(charVecs
, ignore_empty_lines = ignoreEmptyLines
, allow_empty = allowEmpty
, trim_ws = trimWs){
# remove those lines which are empty
# charVecs won't have just empty strings
# in that case, loop is skipped
filledLinesNumbers <- which(trimws(charVecs) != "")
if (ignore_empty_lines) {
charVecs <- charVecs[filledLinesNumbers]
} else {
if (!identical(filledLinesNumbers, 1:length(charVecs))) {
message(paste0(emptyLineMsg, as.character(chunk)))
return(FALSE)
}
}
charList <- strsplit(charVecs, split = sep)
if (allow_empty == FALSE) {
if (trim_ws) {
charCountFun <- function(x){nchar(trimws(x))}
} else {
charCountFun <- function(x){nchar(x)}
}
# check if any string is NULL in a character vector
anyStringNULL <- function(charVec){
any(vapply(charVec
, charCountFun
, FUN.VALUE = integer(1)) == 0)
}
if (any(vapply(charList
, anyStringNULL
, FUN.VALUE = logical(1)) == TRUE)) {
message(paste0(emptyStringMsg, as.character(chunk)))
return(FALSE)
}
}
vapply(charList, length, FUN.VALUE = integer(1)) %>%
unique
}
#
# define and close connection, and handle skip ----
con <- file(path, "r")
on.exit(close(con), add = TRUE)
# to move the readLines pointer by reading only one chunk
if (skip > 0 && skip <= chunkSize) {
skipped <- readLines(con, skip)
rm(skipped)
} else {
quo <- skip %/% chunkSize
rem <- skip - (quo * chunkSize)
for (i in 1:quo) {
skipped <- readLines(con, chunkSize)
rm(skipped)
}
if (rem != 0) {
skipped <- readLines(con, rem)
rm(skipped)
}
}
#
# loop through chunks ----
chunk <- 0L
# create NULL currentLength varible for first use
currentLength <- NULL
while (TRUE) {
# read lines and exit positively when file ends
lines <- readLines(con, chunkSize)
if (length(lines) == 0L) {return(TRUE)}
# keep track of the chunk
chunk <- chunk + 1L
# if all the lines in the chunk are missing:
# either return FALSE or skip loop
uniqueLines <- unique(trimws(lines))
if (length(uniqueLines) == 1 && uniqueLines == "") {
if (ignoreEmptyLines) {
next
} else {
message(paste0(emptyLineMsg, as.character(chunk)))
return(FALSE)
}
}
rm(uniqueLines)
# this can be either FALSE or col lengths
newLength <- getLength(charVecs = lines)
# case 1: when it is FALSE
if (is.flag(newLength)) {
return(FALSE)
}
# case 2: when it has col lenghts
if (length(newLength) > 1L) {
message(paste0(differMsg, as.character(chunk)))
return(FALSE)
}
if (!is.null(currentLength) && currentLength != newLength) {
message(paste0(differMsg, as.character(chunk)))
return(FALSE)
} else {
currentLength <- newLength
}
}
}
# csv_process ----
# Description ----
#
# A wrapping function to read a CSV file in chunks, process them
# with a user specified function and write them back to a CSV.
#
# Usage ----
#
# csv_process(inputFile = 'iris.txt'
# , outputFile = 'iris_output.txt'
# , inputSep = ':'
# , outputSep = ','
# , fun = user_specified_function
# , chunkSize = 100
# , isChunkStrict = FALSE
# , quoteChar = "\""
# , colClasses = NA
# , rowThres = 1000
# , max_cores = 0
# , ...)
#
# Arguments ----
#
# inputFile: path to input file
# outputFile: complete path to output file
# inputSep: input file separator
# default is ','
# outputSep: output file separator
# default is ','
# fun: user specified function which takes an argument
# named either 'chunk_list' or 'chunk_df' as an input
# chunkSize: number of lines to be read in a chunk
# default is 100
# isChunkStrict: boolean indicating whether chunkSize should
# divide number of lines(rows) of the inputFile.
# default is FALSE
# rowThres: Number of lines beyond which csv_process
# function goes into parallel mode
# default is 999
# quoteChar: quote characters with proper escaping in 'chunk_df'
# case
# default is "\""
# colClasses: column classes in 'chunk_df' case.
# default is NA
# max_cores: maximum number of cores to use in parallel mode.
# default is 0, indicating to use all cores
# ...: arguments to be sent as is to 'fun'
#
# Imports/Depends ----
#
# Packages `assertthat`, `magrittr`, `parallel`, `iterators`, `itertools`,
# `stringr`
# The function works on POSIX(unix like) systems ONLY
# system functions used: `wc`, `split`, `cat`
#
# Details ----
#
# The purpose of the function is to provide a framework to read
# CSV file in chunks or line-by-line, apply a
# custom(user-specified) function to the chunk and write it onto
# a new CSV file. This reduces memory overhead significantly.
#
# The function is built to use multiple cores. The files are
# split and fed to multiple cores to do the processing. A
# temporary directory of split inputs and outputs are created in
# the base path of outputFile and later destroyed. This is done
# with system commands like `split`. Make sure that directory is
# writable. Set max_cores to 1, to avoid parallel processing.
#
# fun(user-specified) should take one of the arguments:
# either chunk_df: a dataframe
# or chunk_list: a list
#
# In the case of chunk_df:
# fun(user-specified) should always read a dataframe named chunk_df
# with other optional arguments which are passed as is from ...
# fun should always return a dataframe. These dataframes are
# appended to the outputfile.
#
# In the case of chunk_list:
# fun(user-specified) should always read a list named chunk_list
# with other optional arguments which are passed as is from ...
# fun should always return a list. Each element of this list is
# coerced into a character vector, concatenated by output
# separator and converted to a string. Then, each string written
# to the output CSV file.
#
# Example ----
#
# Please see the corresponding vignette.
#
# Author ----
# Srikanth KS (gmail at sri.teach)
#
# function csv_process ----
csv_process <- function(inputFile
, outputFile
, inputSep = ','
, outputSep = ','
, fun
, chunkSize = 100
, isChunkStrict = FALSE
, quoteChar = "\""
, colClasses = NA
, rowThres = 999
, max_cores = 0
, ...){
# check and load packages ----
stopifnot(require('assertthat'
, quietly = TRUE
, warn.conflicts = FALSE))
assert_that(require('magrittr'
, quietly = TRUE
, warn.conflicts = FALSE))
assert_that(require('iterators'
, quietly = TRUE
, warn.conflicts = FALSE))
assert_that(require('itertools'
, quietly = TRUE
, warn.conflicts = FALSE))
assert_that(require('stringr'
, quietly = TRUE
, warn.conflicts = FALSE))
# assertions ----
assert_that(is.string(inputFile))
assert_that(file.exists(inputFile))
inputFile <- normalizePath(inputFile)
assert_that(is.string(outputFile))
assert_that(!file.exists(outputFile))
assert_that(is.string(inputSep))
assert_that(is.string(outputSep))
assert_that(is.function(fun))
funAtt <- attributes(formals(fun))[["names"]]
booleans <- c("chunk_df", "chunk_list") %in% funAtt
assert_that(length(which(booleans == TRUE)) == 1)
funType <- c("chunk_df", "chunk_list")[which(booleans == TRUE)]
assert_that(is.count(chunkSize))
assert_that(is.flag(isChunkStrict))
assert_that(is.count(rowThres) && rowThres > chunkSize)
assert_that(is.count(max_cores) || max_cores == 0)
# argument list to be passed for 'fun' ----
argListForFun <- list(...)
# utility: how many ----
how_many <- function(path
, toCount = "lines"){
switch(toCount
, lines = assign("tc", "-l")
, words = assign("tc", "-w")
, characters = assign("tc", "-m"))
wcString <- system(paste0("wc ", tc, " ", path)
, intern = TRUE)
number_tentative <- strsplit(wcString, " ")[[1]][1]
if (number_tentative != "wc:") {
return(as.integer(number_tentative))
} else {
stop(wcString)
}
}
# decide about parallel ----
nOfRows <- how_many(inputFile, "lines")
# check if chunkSize is not greater than nOfRows
if (chunkSize > nOfRows) {
stop("chunkSize is greater than number of rows")
}
# check if chunking can be done, strictly
if (isChunkStrict && !is.count(nOfRows/chunkSize)) {
stop("Number of rows is not divisible by chunkSize")
}
# use one core if there are not too many rows
if (nOfRows < rowThres) {
max_cores <- 1
}
nCores <- ifelse(max_cores == 0
, parallel::detectCores()
, min(parallel::detectCores(), max_cores)
)
# function to work on single core ----
funWrapper <- function(user_fun
, input_file
, output_file
, input_sep
, output_sep
, chunk_size
, quote_char
, col_classes
, arg_list
, fun_type){
userFun <- match.fun(user_fun)
if (fun_type == "chunk_df") {
# count lines and set up an skip iterator
n_lines <- how_many(input_file, "lines")
skipLines <- seq(from = 1
, to = n_lines
, by = chunk_size) - 1
skipLines <- ihasNext(iter(skipLines))
# loop to create a DF chunk and process
while (hasNext(skipLines)) {
dfChunk <- read.table(input_file
, sep = input_sep
, quote = quote_char
, colClasses = col_classes
, skip = nextElem(skipLines)
, nrows = chunk_size)
arg_list[["chunk_df"]] <- dfChunk
write.table(do.call(userFun, arg_list)
, file = output_file
, sep = output_sep
, append = TRUE
, col.names = FALSE
, row.names = FALSE
, quote = FALSE)
}
}
if (fun_type == "chunk_list") {
# setup connections
inputCon <- file(input_file, 'r')
outputCon <- file(output_file, 'w')
# close connections on exit
on.exit(close(inputCon), add = TRUE)
on.exit(close(outputCon), add = TRUE)
# loop to write(append) output
while (TRUE) {
linesRead <- readLines(inputCon, n = chunk_size)
if (length(linesRead) == 0) { break }
linesList <- strsplit(linesRead, split = input_sep)
rm(linesRead)
arg_list[['chunk_list']] <- linesList
do.call(userFun, arg_list) %>%
vapply(function(x){
do.call(paste
,c(x, sep = output_sep) %>% as.list)
}
, FUN.VALUE = character(1)) %>%
writeLines(con = outputCon)
}
}
}
# parallel and Non parallel case ----
if (nCores == 1) {
funWrapper(user_fun = fun
, input_file = inputFile
, output_file = outputFile
, input_sep = inputSep
, output_sep = outputSep
, chunk_size = chunkSize
, quote_char = quoteChar
, col_classes = colClasses
, arg_list = argListForFun
, fun_type = funType)
return(invisible(TRUE))
} else {
# parallel case
suffixLength <- ceiling(log10(nCores) + 1)
# split the file into approx nCores number of pieces
tentative <- ceiling(nOfRows/nCores)
toadd <- chunkSize - (tentative %% chunkSize)
nOfLinesPerSplit <- tentative + toadd
# create path for tempDir
if (!dir.exists(dirname(outputFile))) {
dir.create(dirname(outputFile))
}
path <- file.path(dirname(outputFile)
, paste0('tempDir_'
, gsub(" ","_", Sys.time()) %>%
gsub(":","_",.))
)
dir.create(path, recursive = TRUE)
dir.create(file.path(path, 'outputs'))
on.exit(unlink(path, recursive = TRUE), add = TRUE)
# split inputFile into pieces
splitCommand <- paste("cd "
, path
, " ; "
, "split "
, "-a "
, suffixLength %>% as.character
, " -l "
, nOfLinesPerSplit
, " -d "
, inputFile)
system(splitCommand, wait = TRUE)
# padfun
padfun <- function(number, sl = suffixLength){
stringr::str_pad(as.character(number)
, width = sl
, side = 'left'
, pad = '0')
}
# passer
passer <- function(z){
# define local varibles to passed to funWrapper
padfunVal <- padfun(z)
inputFile_l <- paste0('x', padfunVal) %>%
file.path(path, .)
outputFile_l <- paste0('xo', padfunVal) %>%
file.path(path, 'outputs', .)
# invoke funWrapper
funWrapper(user_fun = fun
, input_file = inputFile_l
, output_file = outputFile_l
, input_sep = inputSep
, output_sep = outputSep
, chunk_size = chunkSize
, quote_char = quoteChar
, col_classes = colClasses
, arg_list = argListForFun
, fun_type = funType)
return(TRUE)
}
# count includes 'outputs' folder too
nOfFiles <- list.files(file.path(path)) %>% length
# mute warnings
oldWarn <- getOption('warn')
options(warn = -1)
on.exit(options(warn = oldWarn), add = TRUE)
# note indexing starting at zero
temp <- parallel::mclapply(0:(nOfFiles - 2)
, passer
, mc.cores = nCores
)
# combine the processed files and remove intermediates
oFilesStrings <- list.files(file.path(path, 'outputs'))
oFilesString <- do.call('paste', c(oFilesStrings
, sep = ' ') %>% as.list)
combineCommand <- paste("cd "
, file.path(path, 'outputs')
, " ; "
, 'cat '
, oFilesString
, ' > '
, outputFile)
system(combineCommand, wait = TRUE)
return(invisible(TRUE))
}
}
# csv_rbind
#
# Description ----
#
# concatenate files vertically
#
# Arguments ----
#
# files : character vector of input files
# outputFile : path to the output file
# handleHeader : boolean, defaults to TRUE
#
# Value ----
#
# returns an invisible TRUE if `cat` does not encounter an error.
# else the function halts with an error
#
# Imports/Depends ----
#
# Packages `assertthat`
# The function works on POSIX(unix like) systems ONLY
# system functions used: `cut`
#
# Details ----
#
# if handleHeader is TRUE, first row of all files except the first are discarded
# does not perform the check whether the files have same number of columns
# The error from `cat` is not shielded
#
# Author ----
# Srikanth KS (gmail at sri.teach)
#
devtools::source_url("https://gist.github.com/talegari/808697ab90117571d5db/raw/7669e8e86ef1b97fb3abed27ded90437ae625f3c/csv_remove_toplines")
devtools::source_url("https://gist.github.com/talegari/808697ab90117571d5db/raw/7669e8e86ef1b97fb3abed27ded90437ae625f3c/how_many.R")
csv_rbind <- function(files
, outputFile
, handleHeader = TRUE){
# argument check
stopifnot(require(assertthat
, quietly = TRUE
, warn.conflicts = FALSE
)
)
files <- normalizePath(files)
assert_that(all(vapply(files
, function(afile) { file_test("-f", afile) }
, logical(1)
)) == TRUE)
assert_that(!file.exists(outputFile))
assert_that(is.writeable(dirname(outputFile)))
# remove header lines
if(handleHeader){
# create dir at the same location as the first of the files
newDir <- file.path(dirname(outputFile)
,paste0("csv_rbind", gsub(" ", "_", Sys.time()))
)
dir.create(newDir)
# remove topline from all but first file
for(afile in files[-1]){
csv_remove_toplines(inputFile = afile
, outputFile = file.path(newDir
, basename(afile)
)
)
}
normalized_paths <- normalizePath(
vapply(list.files(newDir)
, function(x) {file.path(newDir, x)}
, character(1)
)
)
files_spaced <- toString(c(files[[1]], normalized_paths))
files_spaced <- gsub(", ", " ", files_spaced)
# paste command
command <- paste0("cat "
, files_spaced
, " > "
, outputFile
)
system(command, intern = TRUE)
unlink(newDir, recursive = TRUE)
} else {
files_spaced <- gsub(", ", " ", toString(files))
# paste command
command <- paste0("cat "
, files_spaced
, " > "
, outputFile
)
system(command, intern = TRUE)
}
return(invisible(TRUE))
}
# csv_remove_toplines
#
# Description ----
#
# remove some "number" of lines from the begining of the file
# number defaults to 1, usually we want to remove the header
#
# Arguments ----
#
# inputFile : path to input file
# outputFile : path to the output file
# number : an integer atleast 1
#
# Value ----
#
# returns an invisible TRUE if sed does not encounter an error.
# else the function halts with an error
#
# Imports/Depends ----
#
# Packages `assertthat`
# The function works on POSIX(unix like) systems ONLY
# system functions used: `sed`
#
# Details ----
#
# The error from "sed" is not shielded
#
# Author ----
# Srikanth KS (gmail at sri.teach)
#
csv_remove_toplines <- function(inputFile
, outputFile
, number = 1L){
# argument check
stopifnot(require(assertthat
, quietly = TRUE
, warn.conflicts = FALSE
)
)
file_test("-f", inputFile)
inputFile <- normalizePath(inputFile)
assert_that(!file.exists(outputFile))
assert_that(is.writeable(dirname(outputFile)))
assert_that(is.count(number))
# sed command
command <- paste0("sed "
, "1,"
, as.character(number)
, "d "
, inputFile
, " > "
, outputFile
)
system(command, intern = TRUE)
return(invisible(TRUE))
}
# csv_sample
#
# Description ----
#
# a utility to sample a few lines from a large CSV on disk onto a
# different CSV file.
#
# Usage ----
#
# csv_sample("iris.csv")
#
# Arguments ----
#
# path : path of the input CSV file
# lineNumbers: a vector of integers indicating the line numbers to be
# sampled. Defaults to NULL.
# percent : percentage of lines to sampled randomly when
# lineNumbers is NULL. Defaults to 10.
# seed : a seed to sample the given percentage of lines when
# lineNumbers is NULL. If missing, a randon seed between
# 1 and 100(inclusive of both) is chosen.
# toReplace : boolean indicating whether to sample the given percentage
# of lines with replacement. Defaults to FALSE.
# forceGC : positive integer indicating the frequency in which gc()
# call will be made. Defaults to 100. (see details)
#
# Imports/Depends ----
#
# Packages `assertthat`, `iterators`, `itertools`
#
# Author ----
#
# Srikanth KS (gmail at sri.teach)
#
# Details ----
#
# The input CSV file is read sequentially and chosen lineNumbers are
# written onto a different CSV file. Despite removing temporary objects
# R does not release memeory to OS, making it mandatory to call gc, in
# order to maintain low memory footprint. But, calling gc often makes the
# function slow. forceGC takes a positive integer indicating the how many
# lineNumbers are processesed before a routine gc call is made. This might
# not be a predictable handle on memory as lineNumbers might be chosen
# randomly and not uniformly.
#
# function csv_sample ----
#
csv_sample <- function(path
, lineNumbers = NULL
, percent = 10L
, chunkSize = 1000
, seed
, toReplace = FALSE
, forceGC = 100){
# assertions ----
if (!require('assertthat', quietly = TRUE, warn.conflicts = FALSE)) {
stop("csv_sample - could not load package 'assertthat'")
}
assert_that(require('iterators'
, quietly = TRUE
, warn.conflicts = FALSE))
assert_that(require('itertools'
, quietly = TRUE
, warn.conflicts = FALSE))
assert_that(file_test("-f", path))
path <- normalizePath(path)
# define how_many
how_many <- function(path, toCount = "lines"){
switch(toCount
, lines = assign("tc", "-l")
, words = assign("tc", "-w")
, characters = assign("tc", "-m"))
wcString <- system(paste0("wc ", tc, " ", path)
, intern = TRUE)
number_tentative <- strsplit(wcString, " ")[[1]][1]
if (number_tentative != "wc:") {
return(as.integer(number_tentative))
} else {
stop(wcString)
}
}
nol <- how_many(path, "lines")
splits <- strsplit(basename(path), split = "\\.")[[1]]
outFileName <- file.path(dirname(path)
, paste(head(splits, 1)
, "_result"
, "."
, tail(splits, n = -1)
, collapse = ""
, sep = "")
)
rm(splits)
assert_that(!file_test("-f", outFileName))
if (!is.null(lineNumbers)) {
assert_that(is.atomic(lineNumbers))
assert_that(class(lineNumbers) == "integer")
assert_that(max(lineNumbers) <= nol)
} else {
assert_that(is.number(percent))
assert_that(percent > 0)
assert_that(percent <= 100)
assert_that(is.flag(toReplace))
if (!missing(seed)) {
assert_that(is.count(seed))
}
}
file.create(outFileName)
# get line numbers to be selected ----
if (is.null(lineNumbers)) {
if (missing(seed)) {
s <- sample(1:100,1)
message("seed chosen: ", s)
set.seed(s)
rm(s)
lineNumbers <- sample(1:nol
, ceiling(nol * percent/100)
, replace = toReplace)
} else{
set.seed(seed)
lineNumbers <- sample(1:nol
, ceiling(nol * percent/100)
, replace = toReplace)
}
}
# read lines ard then write appropriately ----
lni <- ihasNext(iter(sort(lineNumbers)))
currentLine <- 1L
connec <- file(path, "r")
on.exit(close(connec), add = TRUE)
outConnec <- file(outFileName, "w")
on.exit(close(outConnec), add = TRUE)
# fun to move readlines pointer to lineNumber and update currentLine
goToLine <- function(lineNumber){
difference <- lineNumber - currentLine
if (difference == 0) {
return(TRUE)
}
quo <- difference %/% chunkSize
rem <- difference - (quo*chunkSize)
if (quo > 0) {
replicate(quo
, {
temp <- readLines(connec, chunkSize)
rm(temp)
}
)
}
temp <- readLines(connec, rem)
rm(temp)
assign("currentLine", lineNumber, envir = parent.frame())
return(invisible(TRUE))
}
gcIter <- 1
while (hasNext(lni)) {
goToLine(nextElem(lni))
if (gcIter %% forceGC == 0) {gc()}
writeLines(readLines(connec, 1L), con = outConnec)
currentLine <- currentLine + 1L
gcIter <- gcIter + 1L
}
return(invisible(TRUE))
}
# csv_sampleR
#
# Description ----
#
# sample a CSV into R as a dataframe
#
# Usage ----
#
# csv_sampleR("iris.txt")
#
# Arguments ----
#
# inputFile : CSV file to be sampled
# lineNumbers : an integer vector of valid line numbers. Default: NULL
# percent : percent of lines to be sampled when lineNumbers is NULL
# colTypes : named list with sqlite columns types among 'INTEGER', 'REAL'
# and 'TEXT'. Names form the names of the columns.
# header, skip and sep are similar to inputs of read.table
# seed : a positive integer used to randomly generate row numbers
# when percent comes into play.
#
# Description ----
# The function returns a dataframe sampled from the CSV file. Its up to the
# R user to make sure that data fits into RAM. The input data is written to
# a table in SQLite DB and only required rows are sampled into R.
#
# Imports/Depends ----
#
# Packages `assertthat`, `tools`, `RSQLite`
# The function works on POSIX(unix like) systems ONLY
# system functions used: `wc` (by how_many_lines)
#
# Author ----
#
# Srikanth KS (gmail at sri.teach)
#
# function csv_sampleR
csv_sampleR <- function(inputFile
, lineNumbers = NULL
, percent = 10L
, colTypes = NULL
, header = TRUE
, skip = 0
, sep = ","
, seed = NULL){
# package load ----
stopifnot(require('assertthat'
, quietly = TRUE
, warn.conflicts = FALSE))
assert_that(require('RSQLite'
, quietly = TRUE
, warn.conflicts = FALSE))
assert_that(require('tools'
, quietly = TRUE
, warn.conflicts = FALSE))
# assertions ----
assert_that(file_test("-f", inputFile))
assert_that(is.null(lineNumbers) || class(lineNumbers) == "integer")
assert_that(is.number(percent))
assert_that(percent > 0 && percent <= 100)
assert_that(is.list(colTypes) || is.null(colTypes))
if (is.list(colTypes)) {
assert_that(length(colTypes) > 1 && !is.null(names(colTypes)))
assert_that(all(colTypes %in% c('INTEGER', 'REAL', 'TEXT')))
}
assert_that(is.flag(header))
assert_that(skip == 0L || is.count(skip))
assert_that(is.string(sep) && nchar(sep) == 1)
assert_that(is.null(seed) || is.count(seed))
# how_many_lines function ----
how_many_lines <- function(path){
wcString <- system(paste0("wc ", "-l", " ", path)
, intern = TRUE)
number_tentative <- strsplit(wcString, " ")[[1]][1]
if (number_tentative != "wc:") {
return(as.integer(number_tentative))
} else {
stop(wcString)
}
}
# setup DB ----
db_name_string <- paste0(basename(file_path_sans_ext(inputFile))
, ".sqlite")
dbPath <- file.path(dirname(inputFile), db_name_string)
assert_that(!file.exists(dbPath))
# insert data into DB ----
db <- dbConnect(SQLite(), dbname = dbPath)
on.exit(dbDisconnect(db), add = TRUE)
on.exit(unlink(dbPath) , add = TRUE)
# read the CSV into csvTable ----
dbWriteTable( db
, name = "csvTable"
, value = inputFile
, field.types = colTypes
, header = header
, skip = skip
, sep = sep
)
# get sample index ----
if (is.null(lineNumbers)) {
nol <- how_many_lines(inputFile)
# seed setting for code replication
if (is.null(seed)) {
seed <- sample(1:1000,1)
message(paste0("chosen seed: ",as.character(seed)))
}
set.seed(seed)
lineNumbers <- sample(1:nol, ceiling(nol*percent/100))
}
# create and run query ----
query <- paste0("SELECT * FROM csvTable WHERE rowid IN ("
, toString(lineNumbers)
, ")")
return(dbGetQuery(db, query))
}
# csv_select_columns.R
# utility to choose selected columns of a delimited file on disk
# and write a different delimited file. Optionally change the
# separator too.
# columnsToStay should specifiy the column numbers of the
# columns that should stay.
# This was not written to be fast, but to use low memory
# without breaking the R session. chunkSize is the number of rows
# read/written at once. The thumb rule is that at any point
# of time during the execution, we use thrice the memory of the
# readLines object of those many rows.
# written for REPL/interactive use, if you need a batch processor # better use awk or perl.
csv_select_columns <- function(inputFile
, outputFile
, columnsToStay = 1L
, inputSep = ','
, outputSep = inputSep
, chunkSize = 10
, verbose = TRUE){
stopifnot(require('magrittr', warn.conflicts = FALSE))
start_time <- Sys.time()
# check overwriting output file
stopifnot( !file.exists(outputFile))
# check if input file exists
stopifnot( file.exists(inputFile))
# setup connections
inputCon <- file(inputFile, 'r')
outputCon <- file(outputFile, 'w')
# loop to write(append) output
while (TRUE) {
lines <- readLines(inputCon, n = chunkSize)
if (length(lines) == 0) { break }
lines %>%
strsplit(split = inputSep) %>%
lapply(`[`, columnsToStay) %>%
vapply(function(x)
{ do.call('paste'
, as.list(c(x, sep = outputSep)))}
, FUN.VALUE = character(1)) %>%
writeLines(con = outputCon)
}
# closing connections
close(inputCon)
close(outputCon)
# verbose output
if (verbose == TRUE) {
cat('\n----\n')
cat('Processed'
, inputFile
, 'with'
, inputSep
, 'as separator and generated'
, outputFile
, 'with'
, outputSep
, 'as separator '
, 'with these column numbers'
, as.character(columnsToStay))
cat('\n----\n')
end_time <- Sys.time()
cat('start time:\n')
print(start_time)
cat('end time:\n')
print(end_time)
cat('\n')
print(end_time - start_time)
cat('----')
cat('\n')
}
}
# csv_select_columns2.R
# part of unlimiteR project
# utility to choose selected columns of a delimited file on disk
# and write a different delimited file.
# cols argument specifies the columns to be selected
# this is a thin wrapper on unix command: `cut`
csv_select_columns2 = function(inputFile
, outputFile
, separator = ","
, cols = 1L
, verbose = FALSE){
# assertions
library("assertthat")
assert_that(file.exists(inputFile))
assert_that(is.readable(inputFile))
assert_that(!file.exists(outputFile))
assert_that(is.string(separator))
assert_that(is.integer(cols) || is.character(cols))
assert_that(is.flag(verbose))
# get the column numbers required
if(class(cols) != "integer"){
fileCon = file(inputFile, "r")
line1 = readLines(fileCon, 1)
close(fileCon)
cols = match(cols, strsplit(line1, separator)[[1]])
if(length(cols) == 0){
stop("ERROR (csv_select_columns): none of the names in 'cols' match")
}
}
# construct the command
command = paste0("cut"
, " -d \""
, separator
, "\" -f "
, gsub(" ","", toString(cols))
, " "
, inputFile
, " > "
, outputFile
)
# make the system call
system(command = command, intern = TRUE)
# message if verbose
if(verbose){
message(paste0("\n**********************************************\n"
, "input = "
, inputFile
, "\n"
, "output = "
, outputFile
, "\n"
, "columns selected = "
, toString(cols)
, "\n**********************************************\n"
)
)
}
}
# how_many ----
#
# Description ----
# a thin wrapper on unix's `wc` to count lines, words and
# characters
#
# Usage ----
#
# how_many(path = "/home/username/atextfile.txt"
# , toCount = "lines")
#
# Arguments ----
#
# path: path to file
# toCount: one of these- lines, words, characters
#
# Imports/Depends ----
# uses system command: `wc`
# package 'assertthat'
#
# Author ----
# Srikanth KS (gmail at sri.teach)
how_many <- function(path
, toCount = "lines"){
# argument check
stopifnot(require(assertthat
, quietly = TRUE
, warn.conflicts = FALSE))
assert_that(file_test("-f", path))
assert_that(toCount %in% c("lines", "words", "characters"))
switch(toCount
, lines = assign("tc", "-l")
, words = assign("tc", "-w")
, characters = assign("tc", "-m"))
wcString <- system(paste0("wc ", tc, " ", path)
, intern = TRUE)
number_tentative <- strsplit(wcString, " ")[[1]][1]
if (number_tentative != "wc:") {
return(as.integer(number_tentative))
} else {
stop(wcString)
}
}
# convenience functions derived from how_many
#
how_many_lines <- function(path){
how_many(path, toCount = "lines")
}
how_many_words <- function(path){
how_many(path, toCount = "words")
}
how_many_characters <- function(path){
how_many(path, toCount = "characters")
}
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
---
title: "Vignette: `csv_process` (csv_disk_utils_R)"
author: "Srikanth KS"
date: "17 February 2016"
output: html_document
---
```{r setup, include=FALSE}
knitr::opts_chunk$set(echo = TRUE)
```
This is a short introduction to using csv_process (meta)function of csv_disk_utils_R (https://gist.github.com/talegari/808697ab90117571d5db). The function reads a CSV file in chunks,
processes them with a user specified functionand writes them back to a CSV.
We emphasize memory used by R in the process and not the time taken for the process(which also
depends on the user specified function).
```{R}
library('magrittr')
source("/home/srikanth/csv_process.R")
set.seed(007)
longvec <- rnorm(10 ^ 8)
write.table(matrix(longvec, ncol = 4)
, "/home/srikanth/largemat.csv"
, row.names = FALSE
, col.names = FALSE
, sep = ","
, quote = FALSE)
rm(longvec)
# size of the exported CSV in MB
print(file.size("/home/srikanth/largemat.csv")/2 ^ 20)
# for chunk_list
invisible(gc(reset = TRUE, verbose = FALSE))
# current memory usage in MB
initial <- gc()[,6] %>% sum
initial
# create a custom functions
customcor_list <- function(chunk_list){
lapply(chunk_list, function(x){cor(as.numeric(x),1:4)})
}
# how many cores are available?
parallel::detectCores()
start_time <- Sys.time()
# call the function
csv_process(
inputFile = "/home/srikanth/largemat.csv"
, outputFile = "/home/srikanth/largemat_output_list.csv"
, fun = customcor_list
, chunkSize = 100000
, rowThres = 100001
)
end_time <- Sys.time()
print(end_time - start_time)
# check maximum memory usage duing the course of fun run in MB
maximum <- gc()[,6] %>% sum
# maximum amount of memory consumed durng the run
maximum - initial
# read first few lines of the output CSV
read.table("/home/srikanth/largemat_output_list.csv"
, nrows = 5
, sep = ','
, header = FALSE)
# for chunk_df
invisible(gc(reset = TRUE, verbose = FALSE))
# current memory usage in MB
initial <- gc()[,6] %>% sum
initial
# create a custom functions
customcor_df <- function(chunk_df){
chunk_df[,1, drop = FALSE]
}
# how many cores are available?
parallel::detectCores()
start_time <- Sys.time()
# call the function
csv_process(inputFile = "/home/srikanth/largemat.csv"
, outputFile = "/home/srikanth/largemat_output_df.csv"
, fun = customcor_df
, chunkSize = 100000
, rowThres = 100001
)
end_time <- Sys.time()
print(end_time - start_time)
# check maximum memory usage duing the course of function run in MB
maximum <- gc()[,6] %>% sum
maximum
# maximum amount of memory consumed durng the run
maximum - initial
# read first few lines of the output CSV
read.table("/home/srikanth/largemat_output_df.csv"
, nrows = 5
, sep = ','
, header = FALSE)
# clean-up
unlink("/home/srikanth/largemat_output_df.csv")
#unlink("/home/srikanth/largemat_output_list.csv")
unlink("/home/srikanth/largemat.csv")
```
#### Auxiliary Information
```{R}
sessionInfo()
Sys.info()
```
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
---
title: 'Vignette: `csv_aggregate` (unlimiteR)'
author: "Srikanth KS"
output: html_document
---
```{r setup, include=FALSE}
knitr::opts_chunk$set(echo = TRUE)
```
This is a short introduction to using `csv_aggregate` function of [`unlimiteR`](https://gist.github.com/talegari/808697ab90117571d5db).
The function aggregates a CSV file where user specified function is applied on each split and is combined back into a CSV.
We emphasize memory used by `R` in the process and not the time taken for the process(which also depends on the user specified function).
```{R}
library('magrittr')
library('assertthat')
library('RSQLite')
library('iterators')
library('foreach')
library('parallel')
library('devtools')
# source csv_aggregate function from from gist
source_url("http://bit.ly/1oAfevY")
# generate some data
if (file.exists("/home/srikanth/large.csv")) {
unlink("/home/srikanth/large.csv")
}
replicate(10000,
data.frame(1:1000,t(replicate(1000,rnorm(7)))) %>%
write.table("/home/srikanth/large.csv"
, col.names = FALSE
, row.names = FALSE
, sep = ","
, append = TRUE)
) %>% invisible
# input file size in MB
file.size("/home/srikanth/large.csv")/2^20
# user specified function, siple enough to verify the result
pickLeftTop <- function(df){df[1,1,drop = FALSE]}
# how many lines does the dataset contain? (has 8 columns)
system("wc -l /home/srikanth/large.csv", intern = TRUE)
invisible(gc(reset = TRUE, verbose = FALSE))
# current memory usage in MB
initial <- gc()[,6] %>% sum
initial
# how many cores are available?
parallel::detectCores()
start_time <- Sys.time()
# call the function
csv_aggregate(inputFile = "/home/srikanth/large.csv"
, outputFile = "/home/srikanth/large_res.csv"
, groupBy = c('a')
, colTypes = list(a = 'REAL'
, b = 'REAL'
, c = 'REAL'
, d = 'REAL'
, e = 'REAL'
, f = 'REAL'
, g = 'REAL'
, h = 'REAL'
)
, skipLines = 0
, fun = pickLeftTop)
end_time <- Sys.time()
print(end_time - start_time)
# check maximum memory usage duing the course of fun run in MB
maximum <- gc()[,6] %>% sum
maximum
# maximum amount of memory consumed during the run in MB
print(maximum - initial)
# clean-up
unlink("/home/srikanth/large.csv")
#unlink("/home/srikanth/large_res.csv")
```
#### Auxiliary Information
```{R}
sessionInfo()
Sys.info()
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment