Skip to content

Instantly share code, notes, and snippets.

@saptarshiguha
Created May 8, 2017 17:28
Show Gist options
  • Save saptarshiguha/f5aa650003926a7ac6f96c7df93f5c71 to your computer and use it in GitHub Desktop.
Save saptarshiguha/f5aa650003926a7ac6f96c7df93f5c71 to your computer and use it in GitHub Desktop.
islocaltolaptopq <- grepl("darwin",R.version['platform'])
library(mozaws)
aws.init(ec2key="20161025-dataops-dev"
,localpubkey = if(islocaltolaptopq) "~/mz/awsssh.pub" else "~/software/custom/awsssh.pub"
,opts = list(loguri= "s3://mozilla-metrics/share/logs/"
,s3bucket = "mozilla-metrics/share/bootscriptsAndR"
,timeout = as.character(as.integer(1440))
,ec2attributes = "InstanceProfile='telemetry-spark-cloudformation-TelemetrySparkInstanceProfile-1SATUBVEXG7E3'"
,configfile="https://s3-us-west-2.amazonaws.com/telemetry-spark-emr-2/configuration/configuration.json"
))
aws.options(releaselabel="emr-5.2.1")
mozilla.init <- function(cl){
cl <- aws.step.run(cl, script=sprintf('s3://%s/run.user.script.sh',aws.options()$s3bucket)
, args="https://raw.githubusercontent.com/saptarshiguha/mozillametricstools/master/common/mozilla.init.sh"
, name="Clone Our Repo"
, wait=20)
}
spark.make.moz.clus <- function(awsOpts=aws.options(),workers=1,timeout=1440,wait=TRUE,name=NULL){
cl <- aws.clus.create(workers=1,ver=TRUE,applications=c("Spark","Hive","Hadoop","Zeppelin")
,wait=wait
,name=name
,bsactions = list(
"setup-telemetry-cluster" = c('s3://telemetry-spark-emr-2/bootstrap/telemetry.sh',
"--public-key" = awsOpts$localpubkey,
"--email" = if(!is.null(awsOpts$user)) awsOpts$user else "",
"--efs-dns" = "fs-d0c30f79.efs.us-west-2.amazonaws.com",
"--timeout" = as.integer(timeout))
))
}
spark <- function(n=1,wait=10,spot=0.8) {
awsOpts <- aws.options()
cl <- spark.make.moz.clus(awsOpts)
if(identical("WAITING",cl$Status$State)){
cat("Running the Step to add mozillametricstools code\n")
mozilla.init(cl)
print(cl)
if(n>1) cl <- aws.modify.groups(cl, n-1, spot = spot)
print(cl)
while(TRUE){
cl <- aws.clus.info(cl)
ig <- Filter(function(s) !is.null(s$BidPrice),cl$InstanceGroups)
if(length(ig)>0 && ig[[1]]$RunningInstanceCount>=(n-1)/3) break
if(length(ig)>0) print(sprintf("Sleeping, since count of group is: %s",ig[[1]]$RunningInstanceCount))
Sys.sleep(wait)
}
}
cl
}
awsob <- function(cl,pt = "/tmp/pyobject.json"){
y <- sprintf("scp hadoop@%s:%s /tmp/dummy321231",cl$MasterPublicDnsName, pt)
system(y)
readChar("/tmp/dummy321231",n=as.numeric(file.info("/tmp/dummy321231")['size']))
}
awsjs <- function(cl,pt = "/tmp/pyobject.json",dt=TRUE){
tryOther <- function(l){
f1 <- data.table(do.call(rbind,lapply(l, "[[",2)))
s1 <- lapply(l,"[[",1)
u <- list()
for( a in seq_along(s1[[1]])){
u[[ a ]] <- unlist(lapply(s1,function(k) { y = k[[a]]; if(is.null(y) || length(y)==0) NA else y}))
class(u [[ a]]) <- class(s1[[1]][[a]])
}
cbind(as.data.table(u),f1)
}
require(data.table)
require('rjson')
y <- awsob(cl,pt)
l <- fromJSON(y)
if(dt){
vnames <- names(l[[1]])
if(is.null(vnames)) return(tryOther(l))
v <- list()
for(x in vnames){
f <- lapply(l, "[[",x)
isColumnScalar <- all(unlist(lapply(sample(f, min(100, length(f))),length))==1)
if(isColumnScalar) v[[ x ]] <- unlist(f) else v[[x]] <- f
}
as.data.table(v)
}else l
}
awsDF <- function(cl,pt = "/tmp/pyobject.json",colnames=NULL){
require(data.table)
require('rjson')
y <- awsob(cl,pt)
l <- fromJSON(y)
ncols <- length(l[[1]])
x <- list()
pb <- makePB(l = ncols)
for(i in 1:ncols){
x[[ length(x) + 1 ]] <- unlist(lapply(l, function(s) {y <- s[[i]];if(is.null(y) || length(y)==0) NA else y}))
pb$tick(1)
}
u <- do.call(data.table,x)
if(!is.null(colnames)) setnames(u, colnames)
u
}
installlibs <- function(cl){
cl <- aws.step.run(cl, script=sprintf('s3://%s/run.user.script.sh',aws.options()$s3bucket)
, args="https://raw.githubusercontent.com/saptarshiguha/mozillametricstools/master/common/spark.init.step.sh"
, name="Clone Our Repo"
, wait=TRUE)
cl
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment