Skip to content

Instantly share code, notes, and snippets.

@saptarshiguha
Created January 30, 2017 19:29
Show Gist options
  • Save saptarshiguha/32cebb7d2af0319de59f6a7421cea7b4 to your computer and use it in GitHub Desktop.
Save saptarshiguha/32cebb7d2af0319de59f6a7421cea7b4 to your computer and use it in GitHub Desktop.
---
title: The `mozaws` package and its use of the AWS cli tools
output:
html_document:
mathjax: default
toc: true
toc_depth: 3
toc_float: true
theme: paper
highlight: haddock
fig_width: 6
fig_height: 4
fig_caption: true
code_folding: show
self_contained: true
---
<style>
pre code, pre, code {
white-space: pre !important;
overflow-x: scroll !important;
word-break: keep-all !important;
word-wrap: initial !important;
}
</style>
We use the mozaws package [^1] for adminstering our AWS EMR
clusters. Requirements for using this is
- AWS account (obtained by filing this bug: )
- a few settings so that the user has access to the telemetry data sets
Once these have been provided, the package can by installed using R CMD INSTALL
path to mozaws
The package attempts to provide an idiomatic R wrapper around the command line
`aws ...` commands (e.g. `aws create-cluster` ).
## Why We Need This
- **configurability** we can customize the environment that gets set up on our master
node, and we can do everything ourselves/on an individual basis. If we need a
certain package installed (eg. starting to use the Rmd stack, we can just set it
up quickly for our own clusters
- **scriptability** all of our interactions with the cluster can be tied into our R session
(eg. .First and .Last) you could do this in bash too but it’s more of a pain
and it's something you can’t do at all with a web interface
- **monitoring** the status monitor includes all relevant info. Very useful to
have # records and data size, so that we have an idea what’s happening if the
tasks aren’t progressing (eg. it's reading really big files, or it’s doing a
shuffle). These aren’t present in the current Jupyter UI
## Initialization
Before using the mozaws commands, the package needs to be initialized. This sets
the SSH public key (for ssh'ing into the cluster), the EC2 keys, the startup
scripts (to install mozilla spark libs) etc.
FOr example, in my setup
```{r eval=FALSE}
library(mozaws)
aws.init(ec2key="20161025-dataops-dev"
,localpubkey = "~/mz/awsssh.pub"
,opts = list(loguri= "s3://mozilla-metrics/share/logs/"
,s3bucket = "mozilla-metrics/share/bootscriptsAndR"
,timeout = as.character(as.integer(1440))
,ec2attributes = "InstanceProfile='telemetry-spark-cloudformation-XYZ...'"
,configfile="https://s3-us-west-2.amazonaws.com/telemetry-spark-emr-2/configuration/configuration.json"
))
aws.options(releaselabel="emr-5.2.1")
```
In the above,
- set the ec2 key and the SSH public key
- the `loguri` is where logs are kept
- the `S3bucket` is where mozaws keeps several scripts to install packages, run
shell scripts etc
- `ec2attributes` and `configfile` are used in the `create-cluster` invocation
Once `aws.init` has been called we are ready to call `aws cli` commands through
the corresponding mozaws API.
The typical usage then begins with creating cluster, growing a cluster, running
steps, uploading/downloading files, monitoring job progress etc.
## Starting a cluster
Our work flow is best understood by examining a function i wrote to start a
cluster and install packages
```{r eval=FALSE}
spark <-
function(n=1,wait=10,spot=0.8)
{
cl <- aws.clus.create(workers=1,spark=TRUE,ver=TRUE,applications=c("Spark","Hive","Hadoop")
,wait=wait
)
if(identical("WAITING",cl$Status$State)){
cat("Running the Step to add mozillametricstools code\n")
spark.init(cl)
spark.rhipe(cl)
print(cl)
if(n>1) cl <- aws.modify.groups(cl, n-1, spot = spot)
print(cl)
while(TRUE){
cl <- aws.clus.info(cl)
ig <- Filter(function(s) !is.null(s$BidPrice),cl$InstanceGroups)
if(length(ig)>0 && ig[[1]]$RunningInstanceCount>=(n-1)/3) break
if(length(ig)>0) print(sprintf("Sleeping, since count of group is: %s",ig[[1]]$RunningInstanceCount))
Sys.sleep(wait)
}
}
cl
}
```
And this called as `spark(n=12)`. Let's see how this works
```{r eval=FALSE}
cl <- aws.clus.create(workers=1,spark=TRUE,ver=TRUE,applications=c("Spark","Hive","Hadoop")
,wait=wait
)
```
will start a cluster using `describe-cluster` with 1 worker(on
demand),installing Spark, Hive and Hadoop. The output is `verbose` and will
wait by polling `wait` seconds to monitor the status of the cluster.
The type of the master and worker nodes are taken from `aws.options` e.g.
```{r eval=FALSE}
> aws.options()$inst.type
worker master
"c3.4xlarge" "c3.4xlarge"
```
A limitation of `aws.clus.create` (see [^2] for code) is that it needs 1 on-demand node. This
limitation can be easily fixed.
The call to `aws.clus.create` ends up being `aws emr create-cluster` call that looks like
```
aws emr create-cluster --configurations https://s3-us-west-2.amazonaws.com/telemetry-spark-emr-2/configuration/configuration.json --applications Name=Spark Name=Hive Name=Hadoop --service-role EMR_DefaultRole --tags Owner='[email protected]' crtr='rmozaws-1' --visible-to-all-users --release-label 'emr-5.2.1' --log-uri 's3://mozilla-metrics/share/logs/' --name '[email protected] cluster: 3' --enable-debugging --ec2-attributes KeyName='20161025-dataops-dev',InstanceProfile='telemetry-spark-cloudformation-TelemetrySparkInstanceProfile-1SATUBVEXG7E3' --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.4xlarge InstanceGroupType=CORE,InstanceCount=1,InstanceType=c3.4xlarge --bootstrap-actions Path='s3://telemetry-spark-emr-2/bootstrap/telemetry.sh',Args=['--public-key,ssh-rsa AAAAB...','--timeout,1440']
..........
```
and typing `cl` returns (this uses R's S3 print system [^5])
```
This Information as of: 23.69 minutes ago
Cluster ID : j-2UY92VI78CLME
Name : '[email protected] cluster: 3'
State : WAITING
Reason : Cluster ready after last step completed.
Started At : 2017-01-26 08:01:09
Message : Cluster ready after last step completed.
IP : ec2-XXX.compute.amazonaws.com
SSH : ssh [email protected] (assuming did aws.init(localpub=your-pub-key)
else ssh -i /Users/sguha/gdrive/mz/awsssh [email protected]
SOCKS : ssh -ND 8157 [email protected] (and use FoxyProxy for Firefox or SwitchySharp for Chrome)
JobTracker : http://XXX.us-west-2.compute.amazonaws.com:9026 (needs a socks)
Spark UI : http://localhost:8888 but first run ssh -L 8888:localhost:8888 [email protected]
Master Type : c3.4xlarge (and is running: TRUE)
Core Nodes : 1 of c3.4xlarge
```
## Waiting for the Cluster to Start
If you didn't use `wait` to start your cluster, i.e. asynchronously, you can
wait for it to start using `aws.clus.wait` [^3]. This function takes the cluster
object `cl` and a polling interval e.g. 10 seconds and calls `describe-cluster`.
If the call receives a throttling error( i.e. too many retries) it backs off
(similar to exponential throttling). Once the cluster has started or failed, the
updated `cl` object is returned.
## Has Cluster Started Successfully
Now we have a cluster that has finished starting. It could have failed
also. Hence we have the check
```{r eval=FALSE}
...
if(identical("WAITING",cl$Status$State)){
...
```
## Configurable: Running Steps (customizing the cluster/running remote jobs)
If the cluster is `WAITING` it is ready to be used. If this is the case we now install some custom python scripts and r packages.
```{r eval=FALSE}
spark.init(cl)
spark.rhipe(cl)
```
> **configurability** We can customize the environment that gets set up on our master
> node, and we can do everything ourselves/on an individual basis. If we need a
> certain package installed (eg. starting to use the Rmd stack, we can just set it
> up quickly for our own clusters. `aws.step.run` is the key to this.
`spark.init` is wrapper around the following mozaws call. This step installs
mozillametrics python library and some other python packages (e.g. feather).
```{r eval=FALSE}
> cl <- aws.step.run(cl, script=sprintf('s3://%s/run.user.script.sh',aws.options()$s3bucket)
, args="https://raw.githubusercontent.com/saptarshiguha/mozillametricstools/master/common/spark.init.step.sh"
, name="Clone Our Repo"
, wait=TRUE)
Running step with Id (see logs at /mnt/var/log/hadoop/steps/s-3IJ8LT9AGTF1K) : s-3IJ8LT9AGTF1K
```
Steps can be used to submit PySpark/Spark/R jobs to the cluster and then have
them run. Steps can be made to wait (`wait`), but they can be monitored. The
call `aws.step.run` [^4] is a wrapper around `emr add-steps`. We can run the
job, wait for output and then upload the log files (standard error, output
etc). The following function shows how it can be done (given the step id).
```{r eval=FALSE}
uploadLogsToS3 <- function(stepID,JNAME){
zipper <- sprintf("#!/bin/sh
cd /mnt/var/log/hadoop/steps/%s
tar cvfz mylog-%s.tar.zip .
aws s3 cp mylog-%s.tar.zip s3://mozilla-metrics/user/sguha/tmp/
", ID,JNAME, JNAME)
x <- tempfile()
writeLines(zipper,x)
cat(sprintf("Uploading %s to s3://mozilla-metrics/user/sguha/tmp/makeZip%s.sh\n",x,JNAME))
system(sprintf("aws s3 cp %s s3://mozilla-metrics/user/sguha/tmp/makeZip%s.sh",x,JNAME))
cl <- aws.step.run(cl, script=sprintf('s3://%s/run.user.script.sh',aws.options()$s3bucket)
, args=sprintf("s3://mozilla-metrics/user/sguha/tmp/makeZip%s.sh",JNAME)
, name=sprintf("Zipping Logs %s",JNAME)
, wait=TRUE)
print(sprintf("logs can be found at s3://mozilla-metrics/user/sguha/tmp/mylog-%s.tar.zip", JNAME))
}
```
There are multiple ways to do the above (e.g. running a remote command on the
cluster via SSH), this is just one.
The next step `spark.rhipe` installs the Rhipe R package and pandoc.
```{r eval=FALSE}
cl <- aws.step.run(cl, script=sprintf('s3://%s/run.user.script.sh',aws.options()$s3bucket)
, args="https://raw.githubusercontent.com/saptarshiguha/mozaws/master/bootscriptsAndR/setupRhipe.sh"
, name="Install RHIPE"
, wait=TRUE)
cl <- aws.step.run(cl, script=sprintf('s3://%s/run.user.script.sh',aws.options()$s3bucket)
, args="https://raw.githubusercontent.com/saptarshiguha/mozaws/master/bootscriptsAndR/installlPandoc.sh"
, name="Install Pandoc (might take some time)"
, wait=TRUE)
```
All of these steps download the shell scripts in the `args` parameter and
execute them. To see what they do, follow the URL.
> It is important to be able to write to S3. The function `uploadLogsToS3` uploads
> the file to S3. Though uploading to a public website works too
## Growing the cluster
> **modifying running clusters** - we can optimize efficiency of
> our cluster usage by starting and killing nodes as needed. Eg: run an intensive
> data pull with many nodes, then kill all but a few to develop an
> analysis. Rather than start multiple clusters, we can adjust the size of our
> clusters.
We now need to add more nodes. We do this using
```{r eval=FALSE}
...
aws.modify.groups(cl, n-1, spot = spot)
...
```
In the above call, $n-1$ because we one node became an on-demand (a bug). When
this called,it returns immediately and we need to monitor the cluster to see if
the nodes have started. `aws.modify.groups` [^6] wraps `modify-instance-groups `
and `add-instance-groups`. It can add groups of nodes (spot/on-demand), change
their size and delete them.
```{r eval=FALSE}
> aws.modify.groups(cl, 10)
Using a spot price of 0.1764
```
The monitoring (note, a back off algorithm is not used which is a bug)
```{r eval=FALSE}
while(TRUE){
cl <- aws.clus.info(cl)
ig <- Filter(function(s) !is.null(s$BidPrice),cl$InstanceGroups)
if(length(ig)>0 && ig[[1]]$RunningInstanceCount>=(n-1)/3) break
if(length(ig)>0) print(sprintf("Sleeping, since count of group is: %s",ig[[1]]$RunningInstanceCount))
Sys.sleep(wait)
}
```
We see a command here: `aws.clus.info`[^7]. This wraps `describe-cluster` and
`list-steps` and always retrieves the latest information e.g.
In the above code, we
- loop
- get cluster information
- filter on the instance groups which are spot nodes(`!is.null(s$BidPrice)`)
(this is not generalized but works here because we have only one group with
spot nodes, however groups can be named which makes the searching easier)
- assuming at least $(n-1)/3$ nodes have started we break (since the others will
start very soon)
- otherwise we wait (we should ideally use some sort of back of algorithm to prevent
AWS throttling)
Once this loop ends, our nodes have started. if we type `aws.clus.info` it will
look like
```
This Information as of: 0.79 seconds ago
Cluster ID : j-2A4P0QLGUXTBC
Name : '[email protected] cluster: 4'
State : WAITING
Reason : Cluster ready after last step completed.
Started At : 2017-01-27 12:06:57
Message : Cluster ready after last step completed.
IP : X.compute.amazonaws.com
SSH : ssh [email protected] (assuming did aws.init(localpub=your-pub-key)
else ssh -i /Users/sguha/gdrive/mz/awsssh [email protected]
SOCKS : ssh -ND 8157 [email protected] (and use FoxyProxy for Firefox or SwitchySharp for Chrome)
JobTracker : http://ec2-54-218-105-169.us-west-2.compute.amazonaws.com:9026 (needs a socks)
Spark UI : http://localhost:8888 but first run ssh -L 8888:localhost:8888 [email protected]
Master Type : c3.4xlarge (and is running: TRUE)
Core Nodes : 1 of c3.4xlarge
Number of Instance Groups: 1
ID:ig-3AHNJ80GBD3FJ, name: '' state:RUNNING requested:11 (at $0.8), running: 11
https://us-west-2.console.aws.amazon.com/elasticmapreduce/home?region=us-west-2#cluster-details:j-2A4P0QLGUXTBC
```
## Sending and getting files
The commands `aws.get` and `aws.send` (see [^8] and [^9]) are wrappers around
`scp` (soon to be changed to `rsync`) to upload and download files (using sane
defaults e.g. default download destination is home folder etc)
## Monitoring a Job ...
> *monitoring* the status monitor includes all relevant info. Very useful to have
> # records and data size, so that we have an idea what’s happening if the tasks
> aren’t progressing (eg. it’s reading really big files, or it’s doing a
> shuffle). These aren’t present in the current Jupyter UI
Now that we've started a cluster and merrily running jobs, we would like to
monitor progress. This function is work in progress. It has huge drawback of
doing repeated `ssh` calls to the cluster. One idea of a replacement is to
- encourage use of a specific AWS EMR ssh keys
- using these keys,do a reverse ssh tunnel into the cluster
- run the script on the server
The code for this is [^10]. It is called as
```{r eval=FALSE}
monitorCurrentSparkApplication(cl)
```
And its output looks like
```
13:00:41> monitorCurrentSparkApplication(cl)
App:application_1485547995945_0007 Job[id:2, name:'csv at NativeMethodAccessorImpl.java:-2'] started at: 2017-01-27 21:01:32, duration: 0.86 min
Tasks(c,f/all,%): 1031,0/1803,57.2% Stages(c,f/all,%): 0,0/2,0%
stageId name status tActive tCompleted tFailed tInputBytes tOutputBytes tInputRecords tOutputRecords shufflesBytes
1: 3 csv at NativeMethodAccessorImpl.java:-2 PENDING 0 0 0 0 bytes 0 bytes 0 0 0 bytes
2: 2 csv at NativeMethodAccessorImpl.java:-2 ACTIVE 192 1060 0 31.6 GB 0 bytes 1483941478 0 0 bytes
```
What we like about it
- number of Tasks , Stages
- total input records, bytes
- output records,bytes
- percentages complete
## No Zombie Clusters (killing clusters)
The command `aws.kill(cl)` (wraps `terminate-cluster`) will kill the
cluster. How can we guarantee sure all clusters have been killed? The command
`aws.clus.list`[^11] (wraps `list-clusters`) lists all clusters and defaults to
active clusters.
> **scriptability** all of our interactions with the cluster can be tied into our R session
> (eg. .First and .Last) you could do this in bash too but it’s more of a pain
> and it's something you can’t do at all with a web interface
- get this list,passing `since` parametr which is a date string
e.g. Sys.Date() - 5 would be all active clusters in the last 5 days
- select mine
- kill em all
```{r eval=FALSE}
> y <- aws.clus.list(active=TRUE, since=Sys.Date()-2)
> y[[1]]
id name nhrs started state stageChangeMessage elapsed
1: j-PB5VY5J6FI6Y 4444b809-8442-43d3-bc9d-ead0bd919692 0 2017-01-27 12:08:07 BOOTSTRAPPING Running bootstrap actions 6.067458 mins
2: j-2A4P0QLGUXTBC [email protected] cluster: 4 0 2017-01-27 12:06:57 BOOTSTRAPPING Running bootstrap actions 7.244208 mins
3: j-3M5O9010L5768 6ff3f66f-5970-44e2-b1c9-6a48d7408896 64 2017-01-27 11:53:23 RUNNING Running step 20.801425 mins
4: j-1KHV1UURAF86 9ad7056c-d948-493b-bb0d-72178d1104a2 64 2017-01-27 11:48:08 RUNNING Running step 26.049925 mins
5: j-NU9XI4DFFQBN 1d62ce9b-1e4b-4aae-ba68-4764bed5712b 32 2017-01-27 11:47:49 WAITING Cluster ready to run steps. 26.374041 mins
6: j-1Y552WILUUEGD 80b52010-e258-4823-95c8-c56edb27f572 320 2017-01-27 10:32:35 WAITING Cluster ready to run steps. 101.608275 mins
7: j-H1YJIRH7148W 2499e517-4feb-44fc-b8a2-b5ec95a576eb 1120 2017-01-27 09:59:43 WAITING Cluster ready to run steps. 134.467441 mins
8: j-2LXIVWYT2R4I3 b6f52377-117a-4b76-abe7-96bfb4578229 1376 2017-01-27 09:25:47 WAITING Cluster ready to run steps. 168.414441 mins
9: j-E93PRSE5Y14T 3b211770-fc8e-4e10-b379-7e7e13865d2f 864 2017-01-27 09:24:47 WAITING Cluster ready to run steps. 169.405091 mins
10: j-2IWLGGHD3Z0RK ad57c3a5-d00e-4ccc-9c39-ce96b02c77c5 512 2017-01-27 08:51:45 WAITING Cluster ready to run steps. 202.447691 mins
11: j-3ED2OFB15A26T telemetry-presto-20170127-1485532855 1312 2017-01-27 08:01:03 WAITING Cluster ready to run steps. 253.141308 mins
12: j-1KAZ4795XXZJW c7530877-b1e3-4fee-8901-a3891c30aa41 1440 2017-01-27 07:51:54 WAITING Cluster ready to run steps. 262.287358 mins
13: j-1PSPVD1O4SNAV 6faf7b95-296d-4b59-8f9f-eea402a93b86 160 2017-01-27 07:35:34 WAITING Cluster ready to run steps. 278.632908 mins
14: j-AV5ZWKKC2NC5 ebafe0b1-ecba-45b9-b95f-9f4d37c073be 11808 2017-01-26 12:42:39 WAITING Cluster ready to run steps. 1411.540741 mins
15: j-9ETW1T9CUJGG 8d651125-7a39-48cf-8788-1e3d513d220d 2976 2017-01-26 12:28:01 WAITING Cluster ready to run steps. 1426.177591 mins
> sapply(y[grepl("sguha",name),id],function(cid) aws.kill(as.awsCluster(cid)))
```
This routine can be kept in R's `.Last` function which runs when R quits.
## other cool things
We like data so we also like seeing spot prices. The following function
(`aws.spot.price` [^12]) takes the default (from `aws.options`) machine type and
region and retrieves the spot prices. This function wraps `ec2 describe-spot-price-history`.
```{r cache=TRUE,dev='svg',fig.cap="Spot Price History for 'c3.4xlarge', Linux systems", fig.width=9,fig.height=6,fig.align='center'}
y <- aws.spot.price()
xyplot( SpotPrice ~ Timestamp,groups=AvailabilityZone, type=c('p','smooth','g'),auto.key=TRUE,data=y,xlab='Time',ylab='$')
```
## AWS EMR commands used
- list-clusters
- terminate-clusters
- describe-cluster
- list-steps
- describe-step
- add-steps
- modify-instance-groups
- add-instance-groups
- create-cluster
We also use `aws s3 ` for deleting output and retrieving files etc.
## References
[^1]: https://github.com/saptarshiguha/mozaws
[^2]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L147
[^3]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L298
[^4]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L468
[^5]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L350
[^6]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L506
[^7]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L329
[^8]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L426
[^9]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L411
[^10]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/tracker.R#L199
[^11]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L20
[^12]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L489
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment