Created
September 2, 2016 01:56
-
-
Save huafengw/9451a668f09637fc014df584ef4e25b8 to your computer and use it in GitHub Desktop.
yahoo streaming benchmark distributed shell
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# Copyright 2015, Yahoo Inc. | |
# Licensed under the terms of the Apache License 2.0. Please see LICENSE file in the project root for terms. | |
alias ssh="ssh -i ~/.ssh/id_rsa"; | |
alias scp="scp -i ~/.ssh/id_rsa"; | |
set -o errtrace | |
kafkaCluster="node13-1 node13-2 node13-3 node13-4" | |
zookeeperCluster="node13-1 node13-2 node13-3" | |
nimbusNode="node13-1" | |
stormCluster="node13-1 node13-2 node13-3 node13-4" | |
loadCluster="node13-1 node13-2 node13-3 node13-4" | |
round=4 | |
LEIN=${LEIN:-lein} | |
KAFKA_VERSION=${KAFKA_VERSION:-"0.9.0.0"} | |
REDIS_VERSION=${REDIS_VERSION:-"3.0.5"} | |
SCALA_BIN_VERSION=${SCALA_BIN_VERSION:-"2.10"} | |
SCALA_SUB_VERSION=${SCALA_SUB_VERSION:-"4"} | |
STORM_VERSION=${STORM_VERSION:-"0.10.0"} | |
FLINK_VERSION=${FLINK_VERSION:-"0.10.1"} | |
GEARPUMP_VERSION=${GEARPUMP_VERSION:-"0.7.5"} | |
BENCHMARK_DIR="/root/huafengw/streaming-benchmarks" | |
STORM_DIR="/root/apache-storm-$STORM_VERSION" | |
REDIS_DIR="/root/redis-$REDIS_VERSION" | |
KAFKA_DIR="/root/kafka_$SCALA_BIN_VERSION-$KAFKA_VERSION" | |
FLINK_DIR="/root/flink-$FLINK_VERSION" | |
GEARPUMP_DIR="/root/gearpump-2.11-0.7.6-SNAPSHOT" | |
ZOOKEEPER_DIR="/root/zookeeper-3.4.6" | |
ZK_CONNECTIONS="node13-1:2181,node13-2:2181,node13-3:2181" | |
TOPIC=${TOPIC:-"ad-events"} | |
PARTITIONS=${PARTITIONS:-4} | |
LOAD=${LOAD:-17000} | |
CONF_FILE=./conf/localConf.yaml | |
TEST_TIME=${TEST_TIME:-1800} | |
pid_match() { | |
local VAL=`ps -aef | grep "$1" | grep -v grep | awk '{print $2}'` | |
echo $VAL | |
} | |
start_if_needed() { | |
local match="$1" | |
shift | |
local name="$1" | |
shift | |
local sleep_time="$1" | |
shift | |
local PID=`pid_match "$match"` | |
if [[ "$PID" -ne "" ]]; | |
then | |
echo "$name is already running..." | |
else | |
"$@" & | |
sleep $sleep_time | |
fi | |
} | |
stop_if_needed() { | |
local match="$1" | |
local name="$2" | |
local PID=`pid_match "$match"` | |
if [[ "$PID" -ne "" ]]; | |
then | |
kill "$PID" | |
sleep 1 | |
local CHECK_AGAIN=`pid_match "$match"` | |
if [[ "$CHECK_AGAIN" -ne "" ]]; | |
then | |
kill -9 "$CHECK_AGAIN" | |
fi | |
else | |
echo "No $name instance found to stop" | |
fi | |
} | |
fetch_untar_file() { | |
local FILE="download-cache/$1" | |
local URL=$2 | |
if [[ -e "$FILE" ]]; | |
then | |
echo "Using cached File $FILE" | |
else | |
mkdir -p download-cache/ | |
wget -O "$FILE" "$URL" | |
fi | |
tar -xzvf "$FILE" | |
} | |
create_kafka_topic() { | |
local count=`$KAFKA_DIR/bin/kafka-topics.sh --describe --zookeeper "$ZK_CONNECTIONS" --topic $TOPIC 2>/dev/null | grep -c $TOPIC` | |
if [[ "$count" = "0" ]]; | |
then | |
$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper "$ZK_CONNECTIONS" --replication-factor 1 --partitions $PARTITIONS --topic $TOPIC | |
else | |
echo "Kafka topic $TOPIC already exists" | |
fi | |
} | |
run() { | |
OPERATION=$1 | |
if [ "START_ZK" = "$OPERATION" ]; | |
then | |
for node in $zookeeperCluster; do | |
ssh root@$node "cd $ZOOKEEPER_DIR; nohup sh bin/zkServer.sh start" | |
done | |
elif [ "STOP_ZK" = "$OPERATION" ]; | |
then | |
for node in $zookeeperCluster; do | |
ssh root@$node "jps -Vv | grep -i 'zookeeper' | awk '{print \$1}' | xargs kill" | |
ssh root@$node "find /tmp/zookeeper/* ! -name 'myid' -delete" | |
done | |
elif [ "START_REDIS" = "$OPERATION" ]; | |
then | |
start_if_needed redis-server Redis 1 "$REDIS_DIR/src/redis-server" | |
cd data | |
$LEIN run -n --configPath ../conf/benchmarkConf.yaml | |
cd .. | |
elif [ "STOP_REDIS" = "$OPERATION" ]; | |
then | |
stop_if_needed redis-server Redis | |
rm -f dump.rdb | |
elif [ "START_STORM" = "$OPERATION" ]; | |
then | |
for node in $stormCluster; do | |
ssh root@$node "cd $STORM_DIR;rm -rf logs nohup*" | |
done | |
ssh root@$nimbusNode "cd $STORM_DIR;nohup bin/storm nimbus >/dev/null 2>&1 &" | |
ssh root@$nimbusNode "cd $STORM_DIR;nohup bin/storm ui >/dev/null 2>&1 &" | |
sleep 2 | |
for node in $stormCluster; do | |
ssh root@$node "cd $STORM_DIR;nohup bin/storm supervisor > nohup &" | |
done | |
sleep 10 | |
elif [ "STOP_STORM" = "$OPERATION" ]; | |
then | |
ssh root@$nimbusNode "jps | grep 'nimbus' | awk '{print \$1}' | xargs kill -9" | |
ssh root@$nimbusNode "jps | grep 'core' | awk '{print \$1}' | xargs kill -9" | |
for node in $stormCluster; do | |
ssh root@$node "jps | grep 'supervisor' | awk '{print \$1}' | xargs kill -9" | |
done | |
elif [ "START_KAFKA" = "$OPERATION" ]; | |
then | |
for node in $kafkaCluster; do | |
ssh root@$node "cd $KAFKA_DIR;rm -rf logs nohup;nohup bin/kafka-server-start.sh config/server.properties > nohup &" | |
done | |
create_kafka_topic | |
elif [ "STOP_KAFKA" = "$OPERATION" ]; | |
then | |
for node in $kafkaCluster; do | |
ssh root@$node "jps -Vv | grep -i 'Kafka' | awk '{print \$1}' | xargs kill -9" | |
ssh root@$node "rm -rf /tmp/kafka-logs" | |
done | |
elif [ "START_FLINK" = "$OPERATION" ]; | |
then | |
$FLINK_DIR/bin/start-cluster.sh | |
elif [ "STOP_FLINK" = "$OPERATION" ]; | |
then | |
$FLINK_DIR/bin/stop-cluster.sh | |
elif [ "START_LOAD" = "$OPERATION" ]; | |
then | |
t=0 | |
for node in $loadCluster; do | |
ssh root@$node "cd $BENCHMARK_DIR/data;rm -rf nohup*" | |
done | |
while [ $t -lt $round ]; do | |
for node in $loadCluster; do | |
ssh root@$node "cd $BENCHMARK_DIR/data; nohup lein run -r -t $LOAD --configPath ../$CONF_FILE >nohup$t &" | |
done | |
let t=$t+1 | |
done | |
elif [ "STOP_LOAD" = "$OPERATION" ]; | |
then | |
for node in $loadCluster; do | |
ssh root@$node "ps -aef | grep 'leiningen.core.main' | grep -v grep | awk '{print \$2}' | xargs kill" | |
done | |
sleep 5 | |
cd data | |
$LEIN run -g --configPath ../$CONF_FILE || true | |
cd .. | |
elif [ "START_STORM_TOPOLOGY" = "$OPERATION" ]; | |
then | |
"$STORM_DIR/bin/storm" jar ./storm-benchmarks/target/storm-benchmarks-0.1.0.jar storm.benchmark.AdvertisingTopology test-topo -conf $CONF_FILE | |
sleep 15 | |
elif [ "STOP_STORM_TOPOLOGY" = "$OPERATION" ]; | |
then | |
"$STORM_DIR/bin/storm" kill -w 0 test-topo || true | |
sleep 10 | |
elif [ "START_GEARPUMP_APP" = "$OPERATION" ]; | |
then | |
"$GEARPUMP_DIR/bin/gear" app -jar ./gearpump-benchmarks/target/gearpump-benchmarks-0.1.0.jar gearpump.benchmark.Advertising $CONF_FILE & | |
#"$GEARPUMP_DIR/bin/storm" -config ./conf/gear.yaml -jar ./storm-benchmarks/target/storm-benchmarks-0.1.0.jar storm.benchmark.AdvertisingTopology testtopo -conf $CONF_FILE & | |
sleep 5 | |
elif [ "STOP_GEARPUMP_APP" = "$OPERATION" ]; | |
then | |
sleep 1 | |
elif [ "START_FLINK_PROCESSING" = "$OPERATION" ]; | |
then | |
"$FLINK_DIR/bin/flink" run ./flink-benchmarks/target/flink-benchmarks-0.1.0.jar --confPath $CONF_FILE & | |
sleep 3 | |
elif [ "STOP_FLINK_PROCESSING" = "$OPERATION" ]; | |
then | |
FLINK_ID=`"$FLINK_DIR/bin/flink" list | grep 'Flink Streaming Job' | awk '{print $4}'; true` | |
if [ "$FLINK_ID" == "" ]; | |
then | |
echo "Could not find streaming job to kill" | |
else | |
"$FLINK_DIR/bin/flink" cancel $FLINK_ID | |
sleep 3 | |
fi | |
elif [ "STORM_TEST" = "$OPERATION" ]; | |
then | |
run "START_ZK" | |
run "START_REDIS" | |
run "START_KAFKA" | |
run "START_STORM" | |
run "START_STORM_TOPOLOGY" | |
run "START_LOAD" | |
sleep $TEST_TIME | |
run "STOP_LOAD" | |
run "STOP_STORM_TOPOLOGY" | |
run "STOP_STORM" | |
run "STOP_KAFKA" | |
run "STOP_REDIS" | |
run "STOP_ZK" | |
elif [ "GEARPUMP_TEST" = "$OPERATION" ]; | |
then | |
run "START_ZK" | |
run "START_REDIS" | |
run "START_KAFKA" | |
run "START_GEARPUMP_APP" | |
run "START_LOAD" | |
sleep $TEST_TIME | |
run "STOP_LOAD" | |
run "STOP_GEARPUMP_APP" | |
run "STOP_KAFKA" | |
run "STOP_REDIS" | |
run "STOP_ZK" | |
elif [ "FLINK_TEST" = "$OPERATION" ]; | |
then | |
run "START_ZK" | |
run "START_REDIS" | |
run "START_KAFKA" | |
run "START_FLINK" | |
run "START_FLINK_PROCESSING" | |
run "START_LOAD" | |
sleep $TEST_TIME | |
run "STOP_LOAD" | |
run "STOP_FLINK_PROCESSING" | |
run "STOP_FLINK" | |
#run "STOP_KAFKA" | |
run "STOP_REDIS" | |
#run "STOP_ZK" | |
elif [ "STOP_ALL" = "$OPERATION" ]; | |
then | |
run "STOP_LOAD" | |
run "STOP_GEARPUMP_APP" | |
run "STOP_STORM_TOPOLOGY" | |
run "STOP_FLINK_PROCESSING" | |
run "STOP_FLINK" | |
run "STOP_STORM" | |
run "STOP_KAFKA" | |
run "STOP_REDIS" | |
run "STOP_ZK" | |
else | |
if [ "HELP" != "$OPERATION" ]; | |
then | |
echo "UNKOWN OPERATION '$OPERATION'" | |
echo | |
fi | |
echo "Supported Operations:" | |
echo "SETUP: download and setup dependencies for running a single node test" | |
echo "START_ZK: run a single node ZooKeeper instance on local host in the background" | |
echo "STOP_ZK: kill the ZooKeeper instance" | |
echo "START_REDIS: run a redis instance in the background" | |
echo "STOP_REDIS: kill the redis instance" | |
echo "START_KAFKA: run kafka in the background" | |
echo "STOP_KAFKA: kill kafka" | |
echo "START_LOAD: run kafka load generation" | |
echo "STOP_LOAD: kill kafka load generation" | |
echo "START_STORM: run storm daemons in the background" | |
echo "STOP_STORM: kill the storm daemons" | |
echo "START_FLINK: run flink processes" | |
echo "STOP_FLINK: kill flink processes" | |
echo "START_SPARK: run spark processes" | |
echo "STOP_SPARK: kill spark processes" | |
echo | |
echo "START_STORM_TOPOLOGY: run the storm test topology" | |
echo "STOP_STORM_TOPOLOGY: kill the storm test topology" | |
echo "START_FLINK_PROCESSING: run the flink test processing" | |
echo "STOP_FLINK_PROCESSSING: kill the flink test processing" | |
echo "START_SPARK_PROCESSING: run the spark test processing" | |
echo "STOP_SPARK_PROCESSSING: kill the spark test processing" | |
echo | |
echo "STORM_TEST: run storm test (assumes SETUP is done)" | |
echo "FLINK_TEST: run flink test (assumes SETUP is done)" | |
echo "SPARK_TEST: run spark test (assumes SETUP is done)" | |
echo "STOP_ALL: stop everything" | |
echo | |
echo "HELP: print out this message" | |
echo | |
exit 1 | |
fi | |
} | |
if [ $# -lt 1 ]; | |
then | |
run "HELP" | |
else | |
while [ $# -gt 0 ]; | |
do | |
run "$1" | |
shift | |
done | |
fi |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment