Skip to content

Instantly share code, notes, and snippets.

@vitillo
Last active August 29, 2015 14:11
Show Gist options
  • Select an option

  • Save vitillo/d779db7b91e0fd793ad4 to your computer and use it in GitHub Desktop.

Select an option

Save vitillo/d779db7b91e0fd793ad4 to your computer and use it in GitHub Desktop.
Telemetry setup scripts for Spark on EMR
HOME=/home/hadoop
source $HOME/.bashrc
# Error message
error_msg ()
{
echo 1>&2 "Error: $1"
}
# Parse arguments
while [ $# -gt 0 ]; do
case "$1" in
--job-name)
shift
JOB_NAME=$1
;;
--notebook)
shift
NOTEBOOK=$1
;;
--data-bucket)
shift
DATA_BUCKET=$1
;;
-*)
# do not exit out, just note failure
error_msg "unrecognized option: $1"
;;
*)
break;
;;
esac
shift
done
if [ -z "$JOB_NAME" ] || [ -z "$NOTEBOOK" ] || [ -z "$DATA_BUCKET" ]; then
exit -1
fi
LOG="logs/$JOB_NAME.$(date +%Y%m%d%H%M%S).log"
PLOG="../$LOG"
S3_BASE="s3://$DATA_BUCKET/$JOB_NAME"
mkdir -p $HOME/analyses && cd $HOME/analyses
mkdir -p logs
mkdir -p output
# Run notebook
aws s3 cp "$NOTEBOOK" .
cd output
echo "Beginning job $JOB_NAME ..." >> "$PLOG"
runipy "../${NOTEBOOK##*/}" "${NOTEBOOK##*/}" --pylab >> "$PLOG" 2>&1
echo "Finished job $JOB_NAME" >> "$PLOG"
echo "'$MAIN' exited with code $?" >> "$PLOG"
# Upload output files and notebook
find . -iname "*" -type f | while read f
do
# Remove the leading "./"
f=$(sed -e "s/^\.\///" <<< $f)
echo $f
UPLOAD_CMD="aws s3 cp './$f' '$S3_BASE/data/$f'"
if [[ "$f" == *.gz ]]; then
echo "adding 'Content-Type: gzip' for $f" >> "$PLOG"
UPLOAD_CMD="$UPLOAD_CMD --content-encoding gzip"
else
echo "Not adding 'Content-Type' header for $f" >> "$PLOG"
fi
echo "Running: $UPLOAD_CMD" >> "$PLOG"
eval $UPLOAD_CMD &>> "$PLOG"
done
# Upload log
cd ..
gzip "$LOG"
aws s3 cp "${LOG}.gz" "$S3_BASE/logs/$(basename "$LOG").gz" --content-type "text/plain" --content-encoding gzip
sudo yum -y install git jq htop tmux
INSTANCES=$(jq .instanceCount /mnt/var/lib/info/job-flow.json)
FLOWID=$(jq -r .jobFlowId /mnt/var/lib/info/job-flow.json)
EXECUTORS=$(($INSTANCES>1?$INSTANCES:2 - 1))
EXECUTOR_CORES=$(nproc)
MAX_YARN_MEMORY=$(grep /home/hadoop/conf/yarn-site.xml -e "yarn\.scheduler\.maximum-allocation-mb" | sed 's/.*<value>\(.*\).*<\/value>.*/\1/g')
EXECUTOR_MEMORY=$(echo "($MAX_YARN_MEMORY - 1024 - 384) - ($MAX_YARN_MEMORY - 1024 - 384) * 0.07 " | bc | cut -d'.' -f1)M
DRIVER_MEMORY=$EXECUTOR_MEMORY
HOME=/home/hadoop
# Error message
error_msg ()
{
echo 1>&2 "Error: $1"
}
# Check for master node
IS_MASTER=true
if [ -f /mnt/var/lib/info/instance.json ]
then
IS_MASTER=$(jq .isMaster /mnt/var/lib/info/instance.json)
fi
# Parse arguments
while [ $# -gt 0 ]; do
case "$1" in
--num-executors)
shift
EXECUTORS=$1
;;
--executor-cores)
shift
EXECUTOR_CORES=$1
;;
--executor-memory)
shift
EXECUTOR_MEMORY=$1g
;;
--driver-memory)
shift
DRIVER_MEMORY=$1g
;;
--public-key)
shift
PUBLIC_KEY=$1
;;
--timeout)
shift
TIMEOUT=$1
;;
-*)
# do not exit out, just note failure
error_msg "unrecognized option: $1"
;;
*)
break;
;;
esac
shift
done
# Setup Spark
sudo chown hadoop:hadoop /mnt
# Force Python 2.7
sudo rm /usr/bin/python /usr/bin/pip
sudo ln -s /usr/bin/python2.7 /usr/bin/python
sudo ln -s /usr/bin/pip-2.7 /usr/bin/pip
# Setup Python
sudo pip install py4j python_moztelemetry requests boto pyliblzma numpy pandas ipython==2.4.1 pyzmq jinja2 tornado ujson statsmodels runipy plotly montecarlino
# Fix empty backports.ssl-match-hostname package
sudo /usr/bin/yes | sudo pip uninstall backports.ssl_match_hostname && sudo pip install backports.ssl_match_hostname
# Add public key
if [ -n "$PUBLIC_KEY" ]; then
echo $PUBLIC_KEY >> $HOME/.ssh/authorized_keys
fi
# Schedule shutdown at timeout
if [ ! -z $TIMEOUT ]; then
sudo shutdown -h +$TIMEOUT&
fi
# Continue only if master node
if [ "$IS_MASTER" = false ]; then
exit
fi
# Configure environment variables
cat << EOF >> $HOME/.bashrc
# Spark configuration
export PYTHONPATH=$HOME/spark/python/
export SPARK_HOME=$HOME/spark
export _JAVA_OPTIONS="-Dlog4j.configuration=file:///home/hadoop/spark/conf/log4j.properties -Xmx$DRIVER_MEMORY"
EOF
if [ $EXECUTORS -eq 1 ]; then
echo "export PYSPARK_SUBMIT_ARGS=\"--master local[*] --conf spark.local.dir=/mnt --conf spark.akka.frameSize=500\"" >> $HOME/.bashrc
else
echo "export PYSPARK_SUBMIT_ARGS=\"--master yarn --deploy-mode client --num-executors $EXECUTORS --executor-memory $EXECUTOR_MEMORY --executor-cores $EXECUTOR_CORES --conf spark.local.dir=/mnt --conf spark.akka.frameSize=500\"" >> $HOME/.bashrc
fi
source $HOME/.bashrc
# Setup IPython
ipython profile create
cat << EOF > $HOME/.ipython/profile_default/startup/00-pyspark-setup.py
import os
spark_home = os.environ.get('SPARK_HOME', None)
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))
EOF
# Dump Spark logs to a file
cat << EOF > $SPARK_HOME/conf/log4j.properties
# Initialize root logger
log4j.rootLogger=INFO, FILE
# Set everything to be logged to the console
log4j.rootCategory=INFO, FILE
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
# Set the appender named FILE to be a File appender
log4j.appender.FILE=org.apache.log4j.FileAppender
# Change the path to where you want the log file to reside
log4j.appender.FILE.File=$HOME/spark.log
# Prettify output a bit
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
EOF
# Setup plotly
mkdir $HOME/.plotly && aws s3 cp s3://telemetry-spark-emr/plotly_credentials $HOME/.plotly/.credentials
mkdir -p $HOME/analyses && cd $HOME/analyses
wget https://gist.githubusercontent.com/vitillo/e1813025e7d26d640c80/raw/79245cdabe4207a6a29548f8c3192ed180a6f9f5/Telemetry%20Hello%20World.ipynb
ipython notebook &
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment