Last active
August 18, 2017 17:39
-
-
Save russellpierce/b5abf769527b5040817fbc946af2b2be to your computer and use it in GitHub Desktop.
Connect to a JDBC db in R via a single node cluster
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Very raw dump of prototype code that allows for connection to a JDBC db via a ad-hoc one node cluster. This demo code was designed to connect to redshift using Amazon's JDBC drivers - because sometimes you just want to do things the hard way | |
# To make this work, you'll need to update the classPath, driverPath, and of course, the host settings. | |
#library(plyr); needs to be available, but doesn't need to be attached | |
library(whisker) | |
library(R6) | |
library(logging) | |
library(R.utils) #tempvar | |
library(parallel) #makeCluster and friends | |
library(magrittr) # chains for the win | |
library(data.table) #... because I used to use data.table for everything, now I find that dplyr support for data.table is slipping :( | |
# Four functions to standardize the db response and deal with changing db column names | |
listOfKeyValueToDT <- function(targetList, keyName="key", valueName="value") { | |
DT <- data.table(.initCol=1:length(targetList)) #workaround for: Cannot use := to add columns to a null data.table (no columns), currently. You can use := to add (empty) columns to a 0-row data.table (1 or more empty columns), though. | |
set(DT,j = keyName, value=names(targetList)) | |
set(DT,j = valueName, value=unlist(targetList)) | |
DT[,.initCol:=NULL] | |
return(DT) | |
} | |
standardizeColumnText <- function(x, .version, ...) { | |
additionalLookupList <- list(...) | |
if (length(additionalLookupList) > 0) { | |
additionalLookup <- listOfKeyValueToDT(additionalLookupList, keyName="old", valueName="new") | |
} | |
lookupV1 <- as.data.table( | |
rbind( | |
c("byproductsuuid", "byProductsUUID") | |
) | |
) | |
lookup <- as.data.table( | |
rbind( | |
c("activeuser","activeUser") | |
) | |
) | |
if (.version==1) { | |
lookup <- rbind(lookup, lookupV1) | |
} | |
setnames(lookup,c("V1","V2"),c("old","new")) | |
if (length(additionalLookupList) > 0) { | |
lookup <- rbind(lookup[!old %in% additionalLookup$old,], | |
additionalLookup) | |
} | |
# Debugging showed than when parallel processing the gods of R demanded a data.frame | |
lookup <- as.data.frame(lookup) | |
lookup[lookup$old=="idReplace", "old"] <- "id" | |
lookup[lookup$old=="nameReplace","old"] <- "name" | |
res <- plyr::mapvalues(x,lookup[lookup$old %in% x , "old"],lookup[lookup$old %in% x, "new"]) | |
return(res) | |
} | |
standardizeColumnNames <- function(DT,idReplace="id", nameReplace="name", version=1,...) { | |
if (!is.data.table(DT)) { | |
DT <- as.data.table(DT) | |
} | |
newnames <- standardizeColumnText(names(DT), .version=1, idReplace = idReplace, nameReplace = nameReplace, ...) | |
setnames(DT, names(DT), newnames) | |
return(DT) | |
} | |
convertLogicalColumns <- function (x, checkLength) { | |
if(nrow(x)==0) {return(x)} | |
firstPass <- lapply(x, function(x) {all(x %in% c("true","false","T","F","t","f","TRUE","FALSE"))}) | |
candidateCols <- names(firstPass[unlist(firstPass)]) | |
if(length(candidateCols)==0){return(x)} | |
for (j in candidateCols) { | |
set(x,j=j,value=lapply(x[,j,with=FALSE],as.logical)) | |
} | |
return(x) | |
} | |
# A couple functions because I went through a stage where I loved logging | |
logPrefix <- function(...) { | |
msg <- .makeMessage(...) | |
paste(msg) | |
} | |
loggingRoot <- | |
R6Class("loggingRoot", | |
public = list( | |
initialize = function(componentName, logLevel = 30, ...) { | |
private$log <- getLogger(componentName) | |
private$log$addHandler(writeToConsole) | |
self$setLogLevel(logLevel) | |
private$log$debug("logger started") | |
private$initializationDefaults <- list(...) | |
} | |
,setLogLevel = function(level) { | |
private$log$level <- level | |
setLevel(level, private$log$handlers$writeToConsole) | |
} | |
) | |
,private = list( | |
log = NULL | |
,initializationDefaults = list() | |
) | |
) | |
# A Class to hold host entries | |
host <- R6Class("host",public = list( | |
host=character() | |
,port=numeric() | |
,user=character() | |
,password=character() | |
,initialize = function(host, port, user, password, ...) { | |
self$host <- host | |
self$port <- port | |
self$user <- user | |
self$password <- password | |
} | |
, unpack = function(...) { | |
return(c(...,list(host=self$host | |
,password=self$password | |
,port=as.integer(self$port) | |
,user=self$user) | |
)) | |
} | |
) | |
) | |
# A Class to hold sql Host entries | |
sqlHost <- R6Class("sqlHost", inherit=host, public = list( | |
initialize = function(..., db) { | |
self$db <- db | |
super$initialize(...) | |
} | |
,db = character() | |
,unpack = function (...) { | |
return(c(super$unpack(...),list(db=self$db))) | |
} | |
) | |
) | |
# The actual Class to abstract the jdbc connection | |
jdbcDb <- | |
R6Class("jdbcDb", | |
inherit = loggingRoot | |
,public = list( | |
initialize = function (host, driverClass = "", classPath = "", identfier.quote = "", jdbcDbName = tempvar("jdbcDb"), logLevel = 30, ...) { | |
super$initialize(componentName = jdbcDbName, logLevel=logLevel,...) | |
private$log$finest("Initializing") | |
private$hostInformation <- host$unpack() | |
private$driverClass <- driverClass | |
private$classPath <- classPath | |
private$jdbcDbName <- jdbcDbName | |
private$log$finest("Initialization variables exported") | |
} | |
, query = function(statement, template=list()) { | |
private$log$finest("Starting query") | |
while(length(grep("{",statement, fixed=TRUE)) > 0) { | |
logfine("nesting into whiskers") | |
statement <- whisker.render(statement, c(template,redshiftIdioms)) | |
} | |
repeat({ | |
jvmNode <- makeCluster(1, methods=FALSE, port = sample(11000:11999,1), rscript_args="--vanilla") | |
connected <- isTRUE(try(clusterEvalQ(jvmNode, 2)==2, silent=TRUE)) | |
if (connected) break | |
}) | |
private$log$finest("forkable node started") | |
clusterEvalQ(jvmNode,{library(DBI);library(rJava);library(RJDBC)}) | |
clusterEvalQ(jvmNode, systemRAMFree <- function() { | |
#in GB, platform dependent | |
as.numeric(system('FREE_KB=$(($(echo `sed -n \'2p;3p;4p\' < /proc/meminfo | sed "s/ \\+/ /g" | cut -d\' \' -f 2 ` | sed "s/ /+/g")));echo $FREE_KB', intern=TRUE))/1024/1024 | |
}) | |
clusterEvalQ(jvmNode, options(java.parameters = paste0("-Xmx",systemRAMFree(),"g"))) | |
hostInformation <- private$hostInformation | |
classPath <- "~/redshift/redshiftJDBC41.jar" | |
driverClass <- private$driverClass | |
java.parameters <- getOption("java.parameters") | |
toExportList <- list(statement,driverClass, classPath, hostInformation, java.parameters) | |
names(toExportList) <- c("statement","driverClass","classPath", "hostInformation", "java.parameters") | |
toExport <- as.environment(toExportList) | |
clusterExport(jvmNode, ls(toExport), envir=toExport) | |
clusterEvalQ(jvmNode, options(java.parameters = java.parameters)) | |
clusterEvalQ(jvmNode, rJava::.jinit()) | |
private$log$finest("JVM Started") | |
clusterEvalQ(jvmNode, drv <- RJDBC::JDBC(driverClass, classPath)) | |
private$log$finest("Driver obtained") | |
clusterEvalQ(jvmNode, conn <- dbConnect(drv, | |
url = hostInformation$host, | |
user = hostInformation$user, | |
password = hostInformation$password)) | |
private$log$debug(logPrefix("query:", statement)) | |
result <- try(clusterEvalQ(jvmNode, dbGetQuery(conn, statement))[[1]]) | |
clusterEvalQ(jvmNode, dbDisconnect(conn)) | |
stopCluster(jvmNode) | |
result <- result %>% as.data.table %>% standardizeColumnNames | |
#convert results that are manifestly boolean into same | |
result <- convertLogicalColumns(result) | |
return(result) | |
} | |
) | |
,private = list ( | |
hostInformation = list() | |
, driverClass = NULL | |
, classPath = NULL | |
, jdbcDbName = NULL | |
) | |
) | |
# define a host | |
hosts <- list( | |
redshiftSql = sqlHost$new(host="jdbc:redshift://yourhost.redshift.amazonaws.com:5439/database_name", user = "username", password="password", port=5439L, db="database") | |
) | |
# define a connection | |
redshiftDb <- jdbcDb$new(hosts$redshiftSql, | |
driverClass="com.amazon.redshift.jdbc41.Driver", | |
classPath = "~/redshift/redshiftJDBC41.jar", logLevel=10) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment