Last active
February 2, 2017 17:10
-
-
Save daroczig/353c7140c9e45d7cc2db to your computer and use it in GitHub Desktop.
Connect and query Imapala via SSH and Impala-shell from R
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#' Connect and query Imapala via SSH and Impala-shell | |
#' @param query SQL query to run | |
#' @param host server hostname or IP | |
#' @param log enable or disable logging of debug/trace messages | |
#' @return data.table object | |
#' @export | |
query_impala <- function(query, host = 'localhost', log = require(futile.logger)) { | |
## measure time of query | |
timer <- proc.time() | |
## make sure the query is a one-liner | |
query <- gsub('\n', ' ', query) | |
## try to connect and create a remote temp file | |
if (log) flog.trace('Connecting to %s and creating tempfile.', host) | |
rfn <- system(paste0( | |
'ssh ', host, | |
' "mktemp --tmpdir=/tmp rimpala-XXXXXXXX.csv"' | |
), intern = TRUE) | |
## error handling | |
if (length(rfn) == 0) { | |
stop('SSH access error.') | |
} | |
if (log) flog.trace('Remote tempfile: %s', rfn) | |
## build impala-shell query | |
cmd <- paste( | |
'impala-shell -B --quiet -q', | |
shQuote(query), | |
'-o', rfn, | |
'"--output_delimiter=,"', | |
'--print_header > /dev/null') | |
## and log then run it | |
if (log) flog.trace(paste0('Querying impala-shell directly via: ', cmd)) | |
system(paste('ssh', host, shQuote(cmd))) | |
## copy CSV file to localhost | |
lfn <- tempfile(pattern = 'rimpala-', fileext = '.csv') | |
if (log) flog.trace('Copying tempfile to temporary location on localhost: %s', lfn) | |
system(paste0( | |
'scp -q -C ', host, ':', | |
rfn, ' ', lfn)) | |
## read data like a pro | |
if (log) flog.trace('Reading %s bytes of data.', file.info(lfn)$size) | |
res <- tryCatch(fread(lfn, sep = ','), error = function(e) e) | |
## if we cannot read data as a pro | |
if (inherits(res, 'error')) { | |
res <- data.table(read.csv(lfn)) | |
} | |
## cleanup | |
if (log) flog.trace('Deleting temporary files on localhost and remote.') | |
unlink(lfn) | |
system(paste('ssh', host, 'rm', rfn)) | |
## log query time | |
if (log) flog.trace('Query finished with %s rows after %s seconds.', | |
nrow(res), | |
round(as.numeric(proc.time() - timer)[3], 2)) | |
## return | |
res | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment