Created
January 30, 2017 19:29
-
-
Save saptarshiguha/32cebb7d2af0319de59f6a7421cea7b4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
title: The `mozaws` package and its use of the AWS cli tools | |
output: | |
html_document: | |
mathjax: default | |
toc: true | |
toc_depth: 3 | |
toc_float: true | |
theme: paper | |
highlight: haddock | |
fig_width: 6 | |
fig_height: 4 | |
fig_caption: true | |
code_folding: show | |
self_contained: true | |
--- | |
<style> | |
pre code, pre, code { | |
white-space: pre !important; | |
overflow-x: scroll !important; | |
word-break: keep-all !important; | |
word-wrap: initial !important; | |
} | |
</style> | |
We use the mozaws package [^1] for adminstering our AWS EMR | |
clusters. Requirements for using this is | |
- AWS account (obtained by filing this bug: ) | |
- a few settings so that the user has access to the telemetry data sets | |
Once these have been provided, the package can by installed using R CMD INSTALL | |
path to mozaws | |
The package attempts to provide an idiomatic R wrapper around the command line | |
`aws ...` commands (e.g. `aws create-cluster` ). | |
## Why We Need This | |
- **configurability** we can customize the environment that gets set up on our master | |
node, and we can do everything ourselves/on an individual basis. If we need a | |
certain package installed (eg. starting to use the Rmd stack, we can just set it | |
up quickly for our own clusters | |
- **scriptability** all of our interactions with the cluster can be tied into our R session | |
(eg. .First and .Last) you could do this in bash too but it’s more of a pain | |
and it's something you can’t do at all with a web interface | |
- **monitoring** the status monitor includes all relevant info. Very useful to | |
have # records and data size, so that we have an idea what’s happening if the | |
tasks aren’t progressing (eg. it's reading really big files, or it’s doing a | |
shuffle). These aren’t present in the current Jupyter UI | |
## Initialization | |
Before using the mozaws commands, the package needs to be initialized. This sets | |
the SSH public key (for ssh'ing into the cluster), the EC2 keys, the startup | |
scripts (to install mozilla spark libs) etc. | |
FOr example, in my setup | |
```{r eval=FALSE} | |
library(mozaws) | |
aws.init(ec2key="20161025-dataops-dev" | |
,localpubkey = "~/mz/awsssh.pub" | |
,opts = list(loguri= "s3://mozilla-metrics/share/logs/" | |
,s3bucket = "mozilla-metrics/share/bootscriptsAndR" | |
,timeout = as.character(as.integer(1440)) | |
,ec2attributes = "InstanceProfile='telemetry-spark-cloudformation-XYZ...'" | |
,configfile="https://s3-us-west-2.amazonaws.com/telemetry-spark-emr-2/configuration/configuration.json" | |
)) | |
aws.options(releaselabel="emr-5.2.1") | |
``` | |
In the above, | |
- set the ec2 key and the SSH public key | |
- the `loguri` is where logs are kept | |
- the `S3bucket` is where mozaws keeps several scripts to install packages, run | |
shell scripts etc | |
- `ec2attributes` and `configfile` are used in the `create-cluster` invocation | |
Once `aws.init` has been called we are ready to call `aws cli` commands through | |
the corresponding mozaws API. | |
The typical usage then begins with creating cluster, growing a cluster, running | |
steps, uploading/downloading files, monitoring job progress etc. | |
## Starting a cluster | |
Our work flow is best understood by examining a function i wrote to start a | |
cluster and install packages | |
```{r eval=FALSE} | |
spark <- | |
function(n=1,wait=10,spot=0.8) | |
{ | |
cl <- aws.clus.create(workers=1,spark=TRUE,ver=TRUE,applications=c("Spark","Hive","Hadoop") | |
,wait=wait | |
) | |
if(identical("WAITING",cl$Status$State)){ | |
cat("Running the Step to add mozillametricstools code\n") | |
spark.init(cl) | |
spark.rhipe(cl) | |
print(cl) | |
if(n>1) cl <- aws.modify.groups(cl, n-1, spot = spot) | |
print(cl) | |
while(TRUE){ | |
cl <- aws.clus.info(cl) | |
ig <- Filter(function(s) !is.null(s$BidPrice),cl$InstanceGroups) | |
if(length(ig)>0 && ig[[1]]$RunningInstanceCount>=(n-1)/3) break | |
if(length(ig)>0) print(sprintf("Sleeping, since count of group is: %s",ig[[1]]$RunningInstanceCount)) | |
Sys.sleep(wait) | |
} | |
} | |
cl | |
} | |
``` | |
And this called as `spark(n=12)`. Let's see how this works | |
```{r eval=FALSE} | |
cl <- aws.clus.create(workers=1,spark=TRUE,ver=TRUE,applications=c("Spark","Hive","Hadoop") | |
,wait=wait | |
) | |
``` | |
will start a cluster using `describe-cluster` with 1 worker(on | |
demand),installing Spark, Hive and Hadoop. The output is `verbose` and will | |
wait by polling `wait` seconds to monitor the status of the cluster. | |
The type of the master and worker nodes are taken from `aws.options` e.g. | |
```{r eval=FALSE} | |
> aws.options()$inst.type | |
worker master | |
"c3.4xlarge" "c3.4xlarge" | |
``` | |
A limitation of `aws.clus.create` (see [^2] for code) is that it needs 1 on-demand node. This | |
limitation can be easily fixed. | |
The call to `aws.clus.create` ends up being `aws emr create-cluster` call that looks like | |
``` | |
aws emr create-cluster --configurations https://s3-us-west-2.amazonaws.com/telemetry-spark-emr-2/configuration/configuration.json --applications Name=Spark Name=Hive Name=Hadoop --service-role EMR_DefaultRole --tags Owner='[email protected]' crtr='rmozaws-1' --visible-to-all-users --release-label 'emr-5.2.1' --log-uri 's3://mozilla-metrics/share/logs/' --name '[email protected] cluster: 3' --enable-debugging --ec2-attributes KeyName='20161025-dataops-dev',InstanceProfile='telemetry-spark-cloudformation-TelemetrySparkInstanceProfile-1SATUBVEXG7E3' --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.4xlarge InstanceGroupType=CORE,InstanceCount=1,InstanceType=c3.4xlarge --bootstrap-actions Path='s3://telemetry-spark-emr-2/bootstrap/telemetry.sh',Args=['--public-key,ssh-rsa AAAAB...','--timeout,1440'] | |
.......... | |
``` | |
and typing `cl` returns (this uses R's S3 print system [^5]) | |
``` | |
This Information as of: 23.69 minutes ago | |
Cluster ID : j-2UY92VI78CLME | |
Name : '[email protected] cluster: 3' | |
State : WAITING | |
Reason : Cluster ready after last step completed. | |
Started At : 2017-01-26 08:01:09 | |
Message : Cluster ready after last step completed. | |
IP : ec2-XXX.compute.amazonaws.com | |
SSH : ssh [email protected] (assuming did aws.init(localpub=your-pub-key) | |
else ssh -i /Users/sguha/gdrive/mz/awsssh [email protected] | |
SOCKS : ssh -ND 8157 [email protected] (and use FoxyProxy for Firefox or SwitchySharp for Chrome) | |
JobTracker : http://XXX.us-west-2.compute.amazonaws.com:9026 (needs a socks) | |
Spark UI : http://localhost:8888 but first run ssh -L 8888:localhost:8888 [email protected] | |
Master Type : c3.4xlarge (and is running: TRUE) | |
Core Nodes : 1 of c3.4xlarge | |
``` | |
## Waiting for the Cluster to Start | |
If you didn't use `wait` to start your cluster, i.e. asynchronously, you can | |
wait for it to start using `aws.clus.wait` [^3]. This function takes the cluster | |
object `cl` and a polling interval e.g. 10 seconds and calls `describe-cluster`. | |
If the call receives a throttling error( i.e. too many retries) it backs off | |
(similar to exponential throttling). Once the cluster has started or failed, the | |
updated `cl` object is returned. | |
## Has Cluster Started Successfully | |
Now we have a cluster that has finished starting. It could have failed | |
also. Hence we have the check | |
```{r eval=FALSE} | |
... | |
if(identical("WAITING",cl$Status$State)){ | |
... | |
``` | |
## Configurable: Running Steps (customizing the cluster/running remote jobs) | |
If the cluster is `WAITING` it is ready to be used. If this is the case we now install some custom python scripts and r packages. | |
```{r eval=FALSE} | |
spark.init(cl) | |
spark.rhipe(cl) | |
``` | |
> **configurability** We can customize the environment that gets set up on our master | |
> node, and we can do everything ourselves/on an individual basis. If we need a | |
> certain package installed (eg. starting to use the Rmd stack, we can just set it | |
> up quickly for our own clusters. `aws.step.run` is the key to this. | |
`spark.init` is wrapper around the following mozaws call. This step installs | |
mozillametrics python library and some other python packages (e.g. feather). | |
```{r eval=FALSE} | |
> cl <- aws.step.run(cl, script=sprintf('s3://%s/run.user.script.sh',aws.options()$s3bucket) | |
, args="https://raw.githubusercontent.com/saptarshiguha/mozillametricstools/master/common/spark.init.step.sh" | |
, name="Clone Our Repo" | |
, wait=TRUE) | |
Running step with Id (see logs at /mnt/var/log/hadoop/steps/s-3IJ8LT9AGTF1K) : s-3IJ8LT9AGTF1K | |
``` | |
Steps can be used to submit PySpark/Spark/R jobs to the cluster and then have | |
them run. Steps can be made to wait (`wait`), but they can be monitored. The | |
call `aws.step.run` [^4] is a wrapper around `emr add-steps`. We can run the | |
job, wait for output and then upload the log files (standard error, output | |
etc). The following function shows how it can be done (given the step id). | |
```{r eval=FALSE} | |
uploadLogsToS3 <- function(stepID,JNAME){ | |
zipper <- sprintf("#!/bin/sh | |
cd /mnt/var/log/hadoop/steps/%s | |
tar cvfz mylog-%s.tar.zip . | |
aws s3 cp mylog-%s.tar.zip s3://mozilla-metrics/user/sguha/tmp/ | |
", ID,JNAME, JNAME) | |
x <- tempfile() | |
writeLines(zipper,x) | |
cat(sprintf("Uploading %s to s3://mozilla-metrics/user/sguha/tmp/makeZip%s.sh\n",x,JNAME)) | |
system(sprintf("aws s3 cp %s s3://mozilla-metrics/user/sguha/tmp/makeZip%s.sh",x,JNAME)) | |
cl <- aws.step.run(cl, script=sprintf('s3://%s/run.user.script.sh',aws.options()$s3bucket) | |
, args=sprintf("s3://mozilla-metrics/user/sguha/tmp/makeZip%s.sh",JNAME) | |
, name=sprintf("Zipping Logs %s",JNAME) | |
, wait=TRUE) | |
print(sprintf("logs can be found at s3://mozilla-metrics/user/sguha/tmp/mylog-%s.tar.zip", JNAME)) | |
} | |
``` | |
There are multiple ways to do the above (e.g. running a remote command on the | |
cluster via SSH), this is just one. | |
The next step `spark.rhipe` installs the Rhipe R package and pandoc. | |
```{r eval=FALSE} | |
cl <- aws.step.run(cl, script=sprintf('s3://%s/run.user.script.sh',aws.options()$s3bucket) | |
, args="https://raw.githubusercontent.com/saptarshiguha/mozaws/master/bootscriptsAndR/setupRhipe.sh" | |
, name="Install RHIPE" | |
, wait=TRUE) | |
cl <- aws.step.run(cl, script=sprintf('s3://%s/run.user.script.sh',aws.options()$s3bucket) | |
, args="https://raw.githubusercontent.com/saptarshiguha/mozaws/master/bootscriptsAndR/installlPandoc.sh" | |
, name="Install Pandoc (might take some time)" | |
, wait=TRUE) | |
``` | |
All of these steps download the shell scripts in the `args` parameter and | |
execute them. To see what they do, follow the URL. | |
> It is important to be able to write to S3. The function `uploadLogsToS3` uploads | |
> the file to S3. Though uploading to a public website works too | |
## Growing the cluster | |
> **modifying running clusters** - we can optimize efficiency of | |
> our cluster usage by starting and killing nodes as needed. Eg: run an intensive | |
> data pull with many nodes, then kill all but a few to develop an | |
> analysis. Rather than start multiple clusters, we can adjust the size of our | |
> clusters. | |
We now need to add more nodes. We do this using | |
```{r eval=FALSE} | |
... | |
aws.modify.groups(cl, n-1, spot = spot) | |
... | |
``` | |
In the above call, $n-1$ because we one node became an on-demand (a bug). When | |
this called,it returns immediately and we need to monitor the cluster to see if | |
the nodes have started. `aws.modify.groups` [^6] wraps `modify-instance-groups ` | |
and `add-instance-groups`. It can add groups of nodes (spot/on-demand), change | |
their size and delete them. | |
```{r eval=FALSE} | |
> aws.modify.groups(cl, 10) | |
Using a spot price of 0.1764 | |
``` | |
The monitoring (note, a back off algorithm is not used which is a bug) | |
```{r eval=FALSE} | |
while(TRUE){ | |
cl <- aws.clus.info(cl) | |
ig <- Filter(function(s) !is.null(s$BidPrice),cl$InstanceGroups) | |
if(length(ig)>0 && ig[[1]]$RunningInstanceCount>=(n-1)/3) break | |
if(length(ig)>0) print(sprintf("Sleeping, since count of group is: %s",ig[[1]]$RunningInstanceCount)) | |
Sys.sleep(wait) | |
} | |
``` | |
We see a command here: `aws.clus.info`[^7]. This wraps `describe-cluster` and | |
`list-steps` and always retrieves the latest information e.g. | |
In the above code, we | |
- loop | |
- get cluster information | |
- filter on the instance groups which are spot nodes(`!is.null(s$BidPrice)`) | |
(this is not generalized but works here because we have only one group with | |
spot nodes, however groups can be named which makes the searching easier) | |
- assuming at least $(n-1)/3$ nodes have started we break (since the others will | |
start very soon) | |
- otherwise we wait (we should ideally use some sort of back of algorithm to prevent | |
AWS throttling) | |
Once this loop ends, our nodes have started. if we type `aws.clus.info` it will | |
look like | |
``` | |
This Information as of: 0.79 seconds ago | |
Cluster ID : j-2A4P0QLGUXTBC | |
Name : '[email protected] cluster: 4' | |
State : WAITING | |
Reason : Cluster ready after last step completed. | |
Started At : 2017-01-27 12:06:57 | |
Message : Cluster ready after last step completed. | |
IP : X.compute.amazonaws.com | |
SSH : ssh [email protected] (assuming did aws.init(localpub=your-pub-key) | |
else ssh -i /Users/sguha/gdrive/mz/awsssh [email protected] | |
SOCKS : ssh -ND 8157 [email protected] (and use FoxyProxy for Firefox or SwitchySharp for Chrome) | |
JobTracker : http://ec2-54-218-105-169.us-west-2.compute.amazonaws.com:9026 (needs a socks) | |
Spark UI : http://localhost:8888 but first run ssh -L 8888:localhost:8888 [email protected] | |
Master Type : c3.4xlarge (and is running: TRUE) | |
Core Nodes : 1 of c3.4xlarge | |
Number of Instance Groups: 1 | |
ID:ig-3AHNJ80GBD3FJ, name: '' state:RUNNING requested:11 (at $0.8), running: 11 | |
https://us-west-2.console.aws.amazon.com/elasticmapreduce/home?region=us-west-2#cluster-details:j-2A4P0QLGUXTBC | |
``` | |
## Sending and getting files | |
The commands `aws.get` and `aws.send` (see [^8] and [^9]) are wrappers around | |
`scp` (soon to be changed to `rsync`) to upload and download files (using sane | |
defaults e.g. default download destination is home folder etc) | |
## Monitoring a Job ... | |
> *monitoring* the status monitor includes all relevant info. Very useful to have | |
> # records and data size, so that we have an idea what’s happening if the tasks | |
> aren’t progressing (eg. it’s reading really big files, or it’s doing a | |
> shuffle). These aren’t present in the current Jupyter UI | |
Now that we've started a cluster and merrily running jobs, we would like to | |
monitor progress. This function is work in progress. It has huge drawback of | |
doing repeated `ssh` calls to the cluster. One idea of a replacement is to | |
- encourage use of a specific AWS EMR ssh keys | |
- using these keys,do a reverse ssh tunnel into the cluster | |
- run the script on the server | |
The code for this is [^10]. It is called as | |
```{r eval=FALSE} | |
monitorCurrentSparkApplication(cl) | |
``` | |
And its output looks like | |
``` | |
13:00:41> monitorCurrentSparkApplication(cl) | |
App:application_1485547995945_0007 Job[id:2, name:'csv at NativeMethodAccessorImpl.java:-2'] started at: 2017-01-27 21:01:32, duration: 0.86 min | |
Tasks(c,f/all,%): 1031,0/1803,57.2% Stages(c,f/all,%): 0,0/2,0% | |
stageId name status tActive tCompleted tFailed tInputBytes tOutputBytes tInputRecords tOutputRecords shufflesBytes | |
1: 3 csv at NativeMethodAccessorImpl.java:-2 PENDING 0 0 0 0 bytes 0 bytes 0 0 0 bytes | |
2: 2 csv at NativeMethodAccessorImpl.java:-2 ACTIVE 192 1060 0 31.6 GB 0 bytes 1483941478 0 0 bytes | |
``` | |
What we like about it | |
- number of Tasks , Stages | |
- total input records, bytes | |
- output records,bytes | |
- percentages complete | |
## No Zombie Clusters (killing clusters) | |
The command `aws.kill(cl)` (wraps `terminate-cluster`) will kill the | |
cluster. How can we guarantee sure all clusters have been killed? The command | |
`aws.clus.list`[^11] (wraps `list-clusters`) lists all clusters and defaults to | |
active clusters. | |
> **scriptability** all of our interactions with the cluster can be tied into our R session | |
> (eg. .First and .Last) you could do this in bash too but it’s more of a pain | |
> and it's something you can’t do at all with a web interface | |
- get this list,passing `since` parametr which is a date string | |
e.g. Sys.Date() - 5 would be all active clusters in the last 5 days | |
- select mine | |
- kill em all | |
```{r eval=FALSE} | |
> y <- aws.clus.list(active=TRUE, since=Sys.Date()-2) | |
> y[[1]] | |
id name nhrs started state stageChangeMessage elapsed | |
1: j-PB5VY5J6FI6Y 4444b809-8442-43d3-bc9d-ead0bd919692 0 2017-01-27 12:08:07 BOOTSTRAPPING Running bootstrap actions 6.067458 mins | |
2: j-2A4P0QLGUXTBC [email protected] cluster: 4 0 2017-01-27 12:06:57 BOOTSTRAPPING Running bootstrap actions 7.244208 mins | |
3: j-3M5O9010L5768 6ff3f66f-5970-44e2-b1c9-6a48d7408896 64 2017-01-27 11:53:23 RUNNING Running step 20.801425 mins | |
4: j-1KHV1UURAF86 9ad7056c-d948-493b-bb0d-72178d1104a2 64 2017-01-27 11:48:08 RUNNING Running step 26.049925 mins | |
5: j-NU9XI4DFFQBN 1d62ce9b-1e4b-4aae-ba68-4764bed5712b 32 2017-01-27 11:47:49 WAITING Cluster ready to run steps. 26.374041 mins | |
6: j-1Y552WILUUEGD 80b52010-e258-4823-95c8-c56edb27f572 320 2017-01-27 10:32:35 WAITING Cluster ready to run steps. 101.608275 mins | |
7: j-H1YJIRH7148W 2499e517-4feb-44fc-b8a2-b5ec95a576eb 1120 2017-01-27 09:59:43 WAITING Cluster ready to run steps. 134.467441 mins | |
8: j-2LXIVWYT2R4I3 b6f52377-117a-4b76-abe7-96bfb4578229 1376 2017-01-27 09:25:47 WAITING Cluster ready to run steps. 168.414441 mins | |
9: j-E93PRSE5Y14T 3b211770-fc8e-4e10-b379-7e7e13865d2f 864 2017-01-27 09:24:47 WAITING Cluster ready to run steps. 169.405091 mins | |
10: j-2IWLGGHD3Z0RK ad57c3a5-d00e-4ccc-9c39-ce96b02c77c5 512 2017-01-27 08:51:45 WAITING Cluster ready to run steps. 202.447691 mins | |
11: j-3ED2OFB15A26T telemetry-presto-20170127-1485532855 1312 2017-01-27 08:01:03 WAITING Cluster ready to run steps. 253.141308 mins | |
12: j-1KAZ4795XXZJW c7530877-b1e3-4fee-8901-a3891c30aa41 1440 2017-01-27 07:51:54 WAITING Cluster ready to run steps. 262.287358 mins | |
13: j-1PSPVD1O4SNAV 6faf7b95-296d-4b59-8f9f-eea402a93b86 160 2017-01-27 07:35:34 WAITING Cluster ready to run steps. 278.632908 mins | |
14: j-AV5ZWKKC2NC5 ebafe0b1-ecba-45b9-b95f-9f4d37c073be 11808 2017-01-26 12:42:39 WAITING Cluster ready to run steps. 1411.540741 mins | |
15: j-9ETW1T9CUJGG 8d651125-7a39-48cf-8788-1e3d513d220d 2976 2017-01-26 12:28:01 WAITING Cluster ready to run steps. 1426.177591 mins | |
> sapply(y[grepl("sguha",name),id],function(cid) aws.kill(as.awsCluster(cid))) | |
``` | |
This routine can be kept in R's `.Last` function which runs when R quits. | |
## other cool things | |
We like data so we also like seeing spot prices. The following function | |
(`aws.spot.price` [^12]) takes the default (from `aws.options`) machine type and | |
region and retrieves the spot prices. This function wraps `ec2 describe-spot-price-history`. | |
```{r cache=TRUE,dev='svg',fig.cap="Spot Price History for 'c3.4xlarge', Linux systems", fig.width=9,fig.height=6,fig.align='center'} | |
y <- aws.spot.price() | |
xyplot( SpotPrice ~ Timestamp,groups=AvailabilityZone, type=c('p','smooth','g'),auto.key=TRUE,data=y,xlab='Time',ylab='$') | |
``` | |
## AWS EMR commands used | |
- list-clusters | |
- terminate-clusters | |
- describe-cluster | |
- list-steps | |
- describe-step | |
- add-steps | |
- modify-instance-groups | |
- add-instance-groups | |
- create-cluster | |
We also use `aws s3 ` for deleting output and retrieving files etc. | |
## References | |
[^1]: https://github.com/saptarshiguha/mozaws | |
[^2]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L147 | |
[^3]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L298 | |
[^4]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L468 | |
[^5]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L350 | |
[^6]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L506 | |
[^7]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L329 | |
[^8]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L426 | |
[^9]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L411 | |
[^10]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/tracker.R#L199 | |
[^11]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L20 | |
[^12]: https://github.com/saptarshiguha/mozaws/blob/master/mozaws/R/aws.r#L489 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment