Skip to content

Instantly share code, notes, and snippets.

@russellpierce
Last active August 18, 2017 17:39
Show Gist options
  • Save russellpierce/b5abf769527b5040817fbc946af2b2be to your computer and use it in GitHub Desktop.
Save russellpierce/b5abf769527b5040817fbc946af2b2be to your computer and use it in GitHub Desktop.
Connect to a JDBC db in R via a single node cluster
# Very raw dump of prototype code that allows for connection to a JDBC db via a ad-hoc one node cluster. This demo code was designed to connect to redshift using Amazon's JDBC drivers - because sometimes you just want to do things the hard way
# To make this work, you'll need to update the classPath, driverPath, and of course, the host settings.
#library(plyr); needs to be available, but doesn't need to be attached
library(whisker)
library(R6)
library(logging)
library(R.utils) #tempvar
library(parallel) #makeCluster and friends
library(magrittr) # chains for the win
library(data.table) #... because I used to use data.table for everything, now I find that dplyr support for data.table is slipping :(
# Four functions to standardize the db response and deal with changing db column names
listOfKeyValueToDT <- function(targetList, keyName="key", valueName="value") {
DT <- data.table(.initCol=1:length(targetList)) #workaround for: Cannot use := to add columns to a null data.table (no columns), currently. You can use := to add (empty) columns to a 0-row data.table (1 or more empty columns), though.
set(DT,j = keyName, value=names(targetList))
set(DT,j = valueName, value=unlist(targetList))
DT[,.initCol:=NULL]
return(DT)
}
standardizeColumnText <- function(x, .version, ...) {
additionalLookupList <- list(...)
if (length(additionalLookupList) > 0) {
additionalLookup <- listOfKeyValueToDT(additionalLookupList, keyName="old", valueName="new")
}
lookupV1 <- as.data.table(
rbind(
c("byproductsuuid", "byProductsUUID")
)
)
lookup <- as.data.table(
rbind(
c("activeuser","activeUser")
)
)
if (.version==1) {
lookup <- rbind(lookup, lookupV1)
}
setnames(lookup,c("V1","V2"),c("old","new"))
if (length(additionalLookupList) > 0) {
lookup <- rbind(lookup[!old %in% additionalLookup$old,],
additionalLookup)
}
# Debugging showed than when parallel processing the gods of R demanded a data.frame
lookup <- as.data.frame(lookup)
lookup[lookup$old=="idReplace", "old"] <- "id"
lookup[lookup$old=="nameReplace","old"] <- "name"
res <- plyr::mapvalues(x,lookup[lookup$old %in% x , "old"],lookup[lookup$old %in% x, "new"])
return(res)
}
standardizeColumnNames <- function(DT,idReplace="id", nameReplace="name", version=1,...) {
if (!is.data.table(DT)) {
DT <- as.data.table(DT)
}
newnames <- standardizeColumnText(names(DT), .version=1, idReplace = idReplace, nameReplace = nameReplace, ...)
setnames(DT, names(DT), newnames)
return(DT)
}
convertLogicalColumns <- function (x, checkLength) {
if(nrow(x)==0) {return(x)}
firstPass <- lapply(x, function(x) {all(x %in% c("true","false","T","F","t","f","TRUE","FALSE"))})
candidateCols <- names(firstPass[unlist(firstPass)])
if(length(candidateCols)==0){return(x)}
for (j in candidateCols) {
set(x,j=j,value=lapply(x[,j,with=FALSE],as.logical))
}
return(x)
}
# A couple functions because I went through a stage where I loved logging
logPrefix <- function(...) {
msg <- .makeMessage(...)
paste(msg)
}
loggingRoot <-
R6Class("loggingRoot",
public = list(
initialize = function(componentName, logLevel = 30, ...) {
private$log <- getLogger(componentName)
private$log$addHandler(writeToConsole)
self$setLogLevel(logLevel)
private$log$debug("logger started")
private$initializationDefaults <- list(...)
}
,setLogLevel = function(level) {
private$log$level <- level
setLevel(level, private$log$handlers$writeToConsole)
}
)
,private = list(
log = NULL
,initializationDefaults = list()
)
)
# A Class to hold host entries
host <- R6Class("host",public = list(
host=character()
,port=numeric()
,user=character()
,password=character()
,initialize = function(host, port, user, password, ...) {
self$host <- host
self$port <- port
self$user <- user
self$password <- password
}
, unpack = function(...) {
return(c(...,list(host=self$host
,password=self$password
,port=as.integer(self$port)
,user=self$user)
))
}
)
)
# A Class to hold sql Host entries
sqlHost <- R6Class("sqlHost", inherit=host, public = list(
initialize = function(..., db) {
self$db <- db
super$initialize(...)
}
,db = character()
,unpack = function (...) {
return(c(super$unpack(...),list(db=self$db)))
}
)
)
# The actual Class to abstract the jdbc connection
jdbcDb <-
R6Class("jdbcDb",
inherit = loggingRoot
,public = list(
initialize = function (host, driverClass = "", classPath = "", identfier.quote = "", jdbcDbName = tempvar("jdbcDb"), logLevel = 30, ...) {
super$initialize(componentName = jdbcDbName, logLevel=logLevel,...)
private$log$finest("Initializing")
private$hostInformation <- host$unpack()
private$driverClass <- driverClass
private$classPath <- classPath
private$jdbcDbName <- jdbcDbName
private$log$finest("Initialization variables exported")
}
, query = function(statement, template=list()) {
private$log$finest("Starting query")
while(length(grep("{",statement, fixed=TRUE)) > 0) {
logfine("nesting into whiskers")
statement <- whisker.render(statement, c(template,redshiftIdioms))
}
repeat({
jvmNode <- makeCluster(1, methods=FALSE, port = sample(11000:11999,1), rscript_args="--vanilla")
connected <- isTRUE(try(clusterEvalQ(jvmNode, 2)==2, silent=TRUE))
if (connected) break
})
private$log$finest("forkable node started")
clusterEvalQ(jvmNode,{library(DBI);library(rJava);library(RJDBC)})
clusterEvalQ(jvmNode, systemRAMFree <- function() {
#in GB, platform dependent
as.numeric(system('FREE_KB=$(($(echo `sed -n \'2p;3p;4p\' < /proc/meminfo | sed "s/ \\+/ /g" | cut -d\' \' -f 2 ` | sed "s/ /+/g")));echo $FREE_KB', intern=TRUE))/1024/1024
})
clusterEvalQ(jvmNode, options(java.parameters = paste0("-Xmx",systemRAMFree(),"g")))
hostInformation <- private$hostInformation
classPath <- "~/redshift/redshiftJDBC41.jar"
driverClass <- private$driverClass
java.parameters <- getOption("java.parameters")
toExportList <- list(statement,driverClass, classPath, hostInformation, java.parameters)
names(toExportList) <- c("statement","driverClass","classPath", "hostInformation", "java.parameters")
toExport <- as.environment(toExportList)
clusterExport(jvmNode, ls(toExport), envir=toExport)
clusterEvalQ(jvmNode, options(java.parameters = java.parameters))
clusterEvalQ(jvmNode, rJava::.jinit())
private$log$finest("JVM Started")
clusterEvalQ(jvmNode, drv <- RJDBC::JDBC(driverClass, classPath))
private$log$finest("Driver obtained")
clusterEvalQ(jvmNode, conn <- dbConnect(drv,
url = hostInformation$host,
user = hostInformation$user,
password = hostInformation$password))
private$log$debug(logPrefix("query:", statement))
result <- try(clusterEvalQ(jvmNode, dbGetQuery(conn, statement))[[1]])
clusterEvalQ(jvmNode, dbDisconnect(conn))
stopCluster(jvmNode)
result <- result %>% as.data.table %>% standardizeColumnNames
#convert results that are manifestly boolean into same
result <- convertLogicalColumns(result)
return(result)
}
)
,private = list (
hostInformation = list()
, driverClass = NULL
, classPath = NULL
, jdbcDbName = NULL
)
)
# define a host
hosts <- list(
redshiftSql = sqlHost$new(host="jdbc:redshift://yourhost.redshift.amazonaws.com:5439/database_name", user = "username", password="password", port=5439L, db="database")
)
# define a connection
redshiftDb <- jdbcDb$new(hosts$redshiftSql,
driverClass="com.amazon.redshift.jdbc41.Driver",
classPath = "~/redshift/redshiftJDBC41.jar", logLevel=10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment