Last active
November 16, 2023 00:34
-
-
Save mskyttner/46601e60138a886cdfbfc81067c71725 to your computer and use it in GitHub Desktop.
Shell wrapper for duckdb to support reading from stdin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env Rscript | |
# usage for example: | |
# cat data/mydatafile.tsv | head -n 1000 | ./duckstream.R --sql "select mycolumn from stdin;" | |
library(optparse) | |
library(readr) | |
suppressPackageStartupMessages(library(duckdb)) | |
option_list <- list( | |
make_option(c("-i", "--input"), | |
action = "store", default = "stdin", | |
help = "Input file (tab separated values/tsv) to read [default 'stdin']" | |
), | |
make_option(c("-1", "--no-header"), | |
action = "store_true", default = FALSE, | |
help = "Do not parse first row of input file is a header with column names, instead generate col names 'X1, X2, ...'" | |
), | |
make_option(c("-d", "--delim"), | |
action = "store", default = "\t", | |
help = "Delimiter used [default is '\\t']" | |
), | |
make_option(c("-Q", "--sql"), | |
action = "store", default = "select * from stdin limit 10;", | |
help = "Query to issue after loading [default is '%default']", | |
metavar = "select * from stdin limit 10;" | |
), | |
make_option(c("-o", "--output"), | |
action = "store", default = ":memory:", | |
help = "Output file to use [default is '%default']", | |
metavar = ":memory:" | |
), | |
make_option(c("-D", "--dump"), | |
action = "store", default = NULL, | |
help = "Generate dump file in parquet format at specified directory" | |
), | |
make_option(c("--lookahead"), | |
action = "store", default = 500L, | |
help = "Lookahead; number of lines to read when inferring column types [default %default]", | |
metavar = "500" | |
), | |
make_option(c("--chunksize"), | |
action = "store", default = 5e4L, | |
help = "Chunk size in lines when batch loading data [default %default]", | |
metavar = "50000" | |
), | |
make_option(c("-q", "--quiet"), | |
action = "store_true", default = FALSE, | |
help = "Do not print default query results to stdout" | |
), | |
make_option(c("-0", "--hide-header"), | |
action = "store_true", default = FALSE, | |
help = "Show column names as header in output" | |
) | |
) | |
opt <- parse_args(OptionParser(option_list=option_list)) | |
# open input connection | |
stdin <- file(opt$input) | |
open(stdin) | |
on.exit(close(stdin)) | |
# open duckdb | |
con <- duckdb::dbConnect(duckdb::duckdb(dbdir = opt$output)) | |
on.exit(duckdb::dbDisconnect(con, shutdown = TRUE)) | |
is_eof <- FALSE | |
is_header <- FALSE | |
i <- 0 | |
has_noheader <- opt$`no-header` == TRUE | |
read_header <- function(n_size = opt$lookahead, delim = opt$delim) { | |
lines <- readLines(stdin, n = n_size) | |
i <<- length(lines) | |
is_eof <<- i < n_size | |
if (!has_noheader) { | |
header <- readr::read_delim(lines, delim = delim) | |
} else { | |
#ncol <- length(unlist(strsplit(delim, lines[1]))) | |
#colz <- sprintf("col%d", 1:ncol) | |
header <- readr::read_delim(lines, delim = delim, col_names = FALSE) | |
} | |
colspec <- readr::spec(header) | |
list(data = header, colspec = colspec) | |
} | |
write_header <- function(header, temp = "header", tablename = "stdin") { | |
res <- duckdb::duckdb_register(con, temp, header) | |
sql <- sprintf("create table if not exists %s as select * from %s;", tablename, temp) | |
invisible(is.na(DBI::dbExecute(con, sql))) | |
} | |
read_chunk <- function(n_size = opt$chunksize, colspec) { | |
lines <- readLines(stdin, n = n_size) | |
is_eof <<- length(lines) < n_size | |
i <<- i + length(lines) | |
readr::read_delim(c(lines, "\n"), delim = opt$delim, | |
col_names = names(colspec$cols), col_types = colspec$cols) | |
} | |
write_chunk <- function(x, temp = "chunk", tablename = "stdin") { | |
c1 <- duckdb::duckdb_register(con, temp, x) | |
sql <- sprintf("insert into %s select * from %s;", tablename, temp) | |
c2 <- DBI::dbExecute(con, sql) | |
c3 <- duckdb::duckdb_unregister(con, "chunk") | |
invisible(all(is.na(c(c1, c2, c3)))) | |
} | |
process_chunk <- function(colspec, n = opt$chunksize) { | |
chunk <- read_chunk(colspec, n_size = n) | |
write_chunk(chunk) | |
} | |
process_stream <- function() { | |
# guess/infer column types | |
header <- read_header() | |
write_header(header$data) | |
# move input into duckdb chunkwise | |
res <- DBI::dbExecute(con, "begin transaction;") | |
while (!is_eof) process_chunk(colspec = header$colspec) | |
res <- DBI::dbExecute(con, "commit;") | |
res <- DBI::dbExecute(con, "checkpoint;") | |
is.na(res) | |
} | |
is_done <- process_stream() | |
if (opt$quiet == FALSE) { | |
out <- DBI::dbGetQuery(con, opt$sql) | |
is_hidden <- opt$`hide-header` == TRUE | |
if (!is_hidden) tsv <- readr::format_delim(out, delim = opt$delim) | |
if (is_hidden) tsv <- readr::format_delim(out, delim = opt$delim, col_names = FALSE) | |
if (nchar(tsv) > 1) tsv <- substr(tsv, 1, nchar(tsv) - 1) | |
write(tsv, stdout()) | |
} | |
if (!is.null(opt$dump)) { | |
#TODO work around "Error: Not implemented Error: Cannot convert VIEW to SQL because it was not created with a SQL statement" | |
res <- DBI::dbExecute(con, sprintf("create table export as select * from stdin;")) | |
res <- DBI::dbExecute(con, sprintf("drop table stdin;")) | |
res <- DBI::dbExecute(con, sprintf("export database '%s' (format parquet);", opt$dump)) | |
if (!is.na(res)) stop("Export failed!") | |
} | |
q(status = is_done) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment