Created
June 29, 2016 17:42
-
-
Save TimurFayruzov/c2ec9ddcd49eaef71f89f78ba650a72b to your computer and use it in GitHub Desktop.
Setup for running a Flink application on EMR
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ | |
{ | |
"Name": "Ship Flink runtime to cluster", | |
"Path": "s3://<your_bucket>/flink/ship_flink_runtime.sh" | |
}, | |
{ | |
"Name": "Ship application to cluster", | |
"Path": "s3://<your_bucket>/flink/ship_app.sh" | |
} | |
] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ | |
{ | |
"Classification": "hadoop-env", | |
"Configurations": [ | |
{ | |
"Classification": "export", | |
"Configurations": [], | |
"Properties": { | |
"JAVA_HOME": "/usr/lib/jvm/java-1.8.0", | |
"HADOOP_CLASSPATH": "$HADOOP_CLASSPATH:/usr/share/aws/emr/s3-dist-cp/lib/" | |
} | |
} | |
], | |
"Properties": {} | |
} | |
] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[{ | |
"InstanceGroupType":"MASTER", | |
"InstanceCount":1, | |
"InstanceType":"m1.large" | |
}, | |
{ | |
"InstanceGroupType":"CORE", | |
"InstanceCount":1, | |
"InstanceType":"m1.large" | |
}] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ | |
{ | |
"Type": "CUSTOM_JAR", | |
"Name": "Start Flink", | |
"ActionOnFailure": "CONTINUE", | |
"Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar", | |
"Args": ["s3://wp-data-west-2/flink/start_flink.sh"] | |
}, | |
{ | |
"Type": "CUSTOM_JAR", | |
"Name": "Run Flink", | |
"ActionOnFailure": "CONTINUE", | |
"Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar", | |
"Args": ["s3://wp-data-west-2/flink/run_flink.sh"] | |
} | |
] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -e | |
HADOOP_CONF_DIR=/etc/hadoop/conf /home/hadoop/flink-1.0.0/bin/flink run -p 2 /home/hadoop/<your_app>.jar |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
cd /home/hadoop | |
aws s3 cp s3://<your_bucket>/flink/<your_app>.jar . |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
cd /home/hadoop | |
aws s3 cp s3://<your_bucket>/flink/flink-1.0.0-bin-hadoop2-scala_2.11.tgz . | |
tar xzf flink-1.0.0-bin-hadoop2-scala_2.11.tgz | |
rm flink-1.0.0-bin-hadoop2-scala_2.11.tgz | |
## In case you need to set JVM parameters for your app | |
#printf "env.java.opts: \"-D<set_your_parameter>"\n" >> /home/hadoop/flink-1.0.0/conf/flink-conf.yaml | |
## In case you need envirnomet variables on your executors | |
#printf "yarn.taskmanager.env.<name>: <value>\n" >> /home/hadoop/flink-1.0.0/conf/flink-conf.yaml | |
# Had to set that to prevent bootstrap failures doe to timeouts | |
printf "akka.ask.timeout: 200 s\n" >> /home/hadoop/flink-1.0.0/conf/flink-conf.yaml |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#upload bootstrap scripts | |
aws s3 cp ship_flink_runtime.sh s3://<your_bucket>/flink/ship_flink_runtime.sh | |
aws s3 cp ship_app.sh s3://<your_bucket>/flink/ship_app.sh | |
#upload start flink sript | |
aws s3 cp start_flink.sh s3://wp-data-west-2/flink/start_flink.sh | |
aws s3 cp run_flink.sh s3://wp-data-west-2/flink/run_flink.sh | |
aws emr create-cluster --release-label emr-4.5.0 --configurations file://./configurations.json --use-default-roles --instance-groups file://./instance_groups.json --name "my-flink-app" application="my-flink-app" --applications Name=Hadoop --enable-debugging --log-uri s3://<log_bucket>/elasticmapreduce/ --ec2-attributes KeyName=<AWS_key> --bootstrap-actions file://./bootstrap.json --steps file://./run_flink.json "$@" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -e | |
# This is a hack that makes all EMR-specific stuff available to Flink TMs. | |
# There should be a way to pass it through YARN in classpath, but I didn't find it yet. | |
cp /usr/share/aws/emr/emrfs/lib/* /home/hadoop/flink-1.0.0/lib/ | |
cp /usr/share/aws/aws-java-sdk/* /home/hadoop/flink-1.0.0/lib/ | |
cp /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp-2.2.0.jar /home/hadoop/flink-1.0.0/lib/ | |
HADOOP_CONF_DIR=/etc/hadoop/conf nohup /home/hadoop/flink-1.0.0/bin/yarn-session.sh -n 2 -jm 4096 -tm 4096 >> /home/hadoop/flink-1.0.0/log/yarn-session.log 2>&1 & | |
while [ -z "$(grep 'Flink JobManager is now running' /home/hadoop/flink-1.0.0/log/yarn-session.log)" ]; do | |
printf . | |
sleep 1 | |
done |
Was there a reason not to set akka.ask.timeout
in the EMR configurations.json file?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Is there a way to dynamically change parallelism when autoscaling is turned on in EMR?