Skip to content

Instantly share code, notes, and snippets.

@vnijs
Last active August 30, 2021 07:00
Show Gist options
  • Save vnijs/e53a68c957747e82c2e3 to your computer and use it in GitHub Desktop.
Save vnijs/e53a68c957747e82c2e3 to your computer and use it in GitHub Desktop.
Reading a csv file into an sqlite database in chunks
library(dplyr)
library(readr)
library(DBI)
library(RSQLite)
read.csv2sqlite <- function(csv_file, sqlite_file, table_name, batch_size = 10000) {
## establish a connection to the database
condb <- dbConnect(SQLite(), sqlite_file)
## get the total number of records in the file
total_records <- system2("wc", c("-l", csv_file), stdout=TRUE) %>%
sub(normalizePath(csv_file), "", .) %>%
as.integer %>% {. - 1}
message("Total records: ", total_records)
## find the number of passes needed based on size of each batch
passes <- total_records %/% batch_size
remaining <- total_records %% batch_size
## open file connection
f <- file(csv_file)
open(f)
## first pass determines header and column types
dat <- read.csv(f, nrows = batch_size)
col_names <- colnames(dat)
col_types <- sapply(dat, class)
## write to database table
dbWriteTable(condb, table_name, dat, overwrite = TRUE)
## multiple passes - write to database table
for (p in 2:passes) {
read.csv(f, nrows = batch_size, header = FALSE, col.names = col_names,
colClasses = col_types) %>%
dbWriteTable(condb, table_name, ., append = TRUE)
}
if (remaining) {
read.csv(f, nrows = remaining, header = FALSE, col.names = col_names,
colClasses = col_types) %>%
dbWriteTable(condb, table_name, ., append = TRUE)
}
## close the file and database connection
close(f)
dbDisconnect(condb)
}
read_csv2sqlite <- function(csv_file, sqlite_file, table_name, batch_size = 10000) {
## establish a connection to the database
condb <- dbConnect(SQLite(), sqlite_file)
## get the total number of records in the file
total_records <- system2("wc", c("-l", csv_file), stdout=TRUE) %>%
sub(normalizePath(csv_file), "", .) %>%
as.integer %>% {. - 1}
message("Total records: ", total_records)
## find the number of passes needed based on size of each batch
passes <- total_records %/% batch_size
remaining <- total_records %% batch_size
## first pass determines header and column types
dat <- read_csv(csv_file, n_max = batch_size) %>% as.data.frame
if (nrow(problems(dat)) > 0) print(problems(dat))
col_names <- colnames(dat)
col_types <- c(character = 'c', numeric = 'd', integer = 'i', logical = 'l', Date = 'c') %>%
.[sapply(dat, class)] %>%
paste0(collapse="")
## write to database table
dbWriteTable(condb, table_name, dat, overwrite = TRUE)
## multiple passes
for (p in 2:passes) {
read_csv(csv_file, col_names = col_names, col_types = col_types,
skip = (p - 1) * batch_size + 1, n_max = batch_size) %>%
as.data.frame %>%
dbWriteTable(condb, table_name, ., append = TRUE)
}
if (remaining) {
read_csv(csv_file, col_names = col_names, col_types = col_types,
skip = p * batch_size + 1, n_max = remaining) %>%
as.data.frame %>%
dbWriteTable(condb, table_name, ., append = TRUE)
}
## close the database connection
dbDisconnect(condb)
}
## create data
fpath <- "~/Desktop"
csv_file <- file.path(fpath, "simdat.csv")
batch_size <- 10000
nr_obs <- 1000 * batch_size
simdat <- data.frame(col0 = seq_len(nr_obs), col1 = sample(letters, nr_obs, TRUE),
col2 = rnorm(nr_obs), col3 = sample(1:100, nr_obs, replace = TRUE))
write_csv(simdat, csv_file)
table_name <- "data"
## using read_csv
sqlite_read_csv <- file.path(fpath,"read_csv.sqlite3")
system.time(read_csv2sqlite(csv_file, sqlite_read_csv, table_name, batch_size))
## using read.csv with file connection
sqlite_read.csv <- file.path(fpath,"read.csv.sqlite3")
system.time(read.csv2sqlite(csv_file, sqlite_read.csv, table_name, batch_size))
## check for read_csv
src_sqlite(sqlite_read_csv, create = FALSE) %>% tbl(table_name) %>% collect %>%
{ all(. == read_csv(csv_file)) }
## check for read.csv (using read_csv below will produce FALSE)
src_sqlite(sqlite_read.csv, create = FALSE) %>% tbl(table_name) %>% collect %>%
{ all(. == read.csv(csv_file)) }
## all in one go with read_csv
system.time({
condb <- dbConnect(SQLite(), sqlite_read_csv)
read_csv(csv_file) %>% as.data.frame %>%
dbWriteTable(condb, table_name, ., overwrite = TRUE)
dbDisconnect(condb)
})
## all in one go with read.csv
system.time({
condb <- dbConnect(SQLite(), sqlite_read.csv)
read.csv(csv_file) %>%
dbWriteTable(condb, table_name, ., overwrite = TRUE)
dbDisconnect(condb)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment