Last active
August 30, 2021 07:00
-
-
Save vnijs/e53a68c957747e82c2e3 to your computer and use it in GitHub Desktop.
Reading a csv file into an sqlite database in chunks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(dplyr) | |
library(readr) | |
library(DBI) | |
library(RSQLite) | |
read.csv2sqlite <- function(csv_file, sqlite_file, table_name, batch_size = 10000) { | |
## establish a connection to the database | |
condb <- dbConnect(SQLite(), sqlite_file) | |
## get the total number of records in the file | |
total_records <- system2("wc", c("-l", csv_file), stdout=TRUE) %>% | |
sub(normalizePath(csv_file), "", .) %>% | |
as.integer %>% {. - 1} | |
message("Total records: ", total_records) | |
## find the number of passes needed based on size of each batch | |
passes <- total_records %/% batch_size | |
remaining <- total_records %% batch_size | |
## open file connection | |
f <- file(csv_file) | |
open(f) | |
## first pass determines header and column types | |
dat <- read.csv(f, nrows = batch_size) | |
col_names <- colnames(dat) | |
col_types <- sapply(dat, class) | |
## write to database table | |
dbWriteTable(condb, table_name, dat, overwrite = TRUE) | |
## multiple passes - write to database table | |
for (p in 2:passes) { | |
read.csv(f, nrows = batch_size, header = FALSE, col.names = col_names, | |
colClasses = col_types) %>% | |
dbWriteTable(condb, table_name, ., append = TRUE) | |
} | |
if (remaining) { | |
read.csv(f, nrows = remaining, header = FALSE, col.names = col_names, | |
colClasses = col_types) %>% | |
dbWriteTable(condb, table_name, ., append = TRUE) | |
} | |
## close the file and database connection | |
close(f) | |
dbDisconnect(condb) | |
} | |
read_csv2sqlite <- function(csv_file, sqlite_file, table_name, batch_size = 10000) { | |
## establish a connection to the database | |
condb <- dbConnect(SQLite(), sqlite_file) | |
## get the total number of records in the file | |
total_records <- system2("wc", c("-l", csv_file), stdout=TRUE) %>% | |
sub(normalizePath(csv_file), "", .) %>% | |
as.integer %>% {. - 1} | |
message("Total records: ", total_records) | |
## find the number of passes needed based on size of each batch | |
passes <- total_records %/% batch_size | |
remaining <- total_records %% batch_size | |
## first pass determines header and column types | |
dat <- read_csv(csv_file, n_max = batch_size) %>% as.data.frame | |
if (nrow(problems(dat)) > 0) print(problems(dat)) | |
col_names <- colnames(dat) | |
col_types <- c(character = 'c', numeric = 'd', integer = 'i', logical = 'l', Date = 'c') %>% | |
.[sapply(dat, class)] %>% | |
paste0(collapse="") | |
## write to database table | |
dbWriteTable(condb, table_name, dat, overwrite = TRUE) | |
## multiple passes | |
for (p in 2:passes) { | |
read_csv(csv_file, col_names = col_names, col_types = col_types, | |
skip = (p - 1) * batch_size + 1, n_max = batch_size) %>% | |
as.data.frame %>% | |
dbWriteTable(condb, table_name, ., append = TRUE) | |
} | |
if (remaining) { | |
read_csv(csv_file, col_names = col_names, col_types = col_types, | |
skip = p * batch_size + 1, n_max = remaining) %>% | |
as.data.frame %>% | |
dbWriteTable(condb, table_name, ., append = TRUE) | |
} | |
## close the database connection | |
dbDisconnect(condb) | |
} | |
## create data | |
fpath <- "~/Desktop" | |
csv_file <- file.path(fpath, "simdat.csv") | |
batch_size <- 10000 | |
nr_obs <- 1000 * batch_size | |
simdat <- data.frame(col0 = seq_len(nr_obs), col1 = sample(letters, nr_obs, TRUE), | |
col2 = rnorm(nr_obs), col3 = sample(1:100, nr_obs, replace = TRUE)) | |
write_csv(simdat, csv_file) | |
table_name <- "data" | |
## using read_csv | |
sqlite_read_csv <- file.path(fpath,"read_csv.sqlite3") | |
system.time(read_csv2sqlite(csv_file, sqlite_read_csv, table_name, batch_size)) | |
## using read.csv with file connection | |
sqlite_read.csv <- file.path(fpath,"read.csv.sqlite3") | |
system.time(read.csv2sqlite(csv_file, sqlite_read.csv, table_name, batch_size)) | |
## check for read_csv | |
src_sqlite(sqlite_read_csv, create = FALSE) %>% tbl(table_name) %>% collect %>% | |
{ all(. == read_csv(csv_file)) } | |
## check for read.csv (using read_csv below will produce FALSE) | |
src_sqlite(sqlite_read.csv, create = FALSE) %>% tbl(table_name) %>% collect %>% | |
{ all(. == read.csv(csv_file)) } | |
## all in one go with read_csv | |
system.time({ | |
condb <- dbConnect(SQLite(), sqlite_read_csv) | |
read_csv(csv_file) %>% as.data.frame %>% | |
dbWriteTable(condb, table_name, ., overwrite = TRUE) | |
dbDisconnect(condb) | |
}) | |
## all in one go with read.csv | |
system.time({ | |
condb <- dbConnect(SQLite(), sqlite_read.csv) | |
read.csv(csv_file) %>% | |
dbWriteTable(condb, table_name, ., overwrite = TRUE) | |
dbDisconnect(condb) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment