- These demo is part of a webinar on 'HBase for Mission Critical Applications'
- The webinar recording /slides are available at http://hortonworks.com/partners/learn/#hbase2
- The Ambari service for OpenTSDB is available at https://github.com/abajwa-hw/opentsdb-service
- The full list of available workshops is available at http://hortonworks.com/partners/learn
There is a prebuilt Centos 6.5 VM with the below components installed:
- HDP 2.3.0.0-1754
- Spark 1.3.1
- Ambari service/view for OpenTSDB
- Ambari service/view for Zeppelin
- Intraday stock price data from Google finance in both csv and opentsdb format
- Data for AAPL, GOOG, HDP for April 2015 has already been imported into both OpenTSDB andthe PRICES table in Phoenix
Steps:
- Download .ova file from here and import into VMWare Fusion
- Once it starts, create hosts file entry on local machine for sandbox.hortonworks.com
- Launch Ambari by clicking here: http://sandbox.hortonworks.com:8080 and login as admin/admin
- Start HBASE, OpenTSDB and (optional) Zeppelin services
- Try examples from phoenix-spark by opening SSH session to the VM (
ssh [email protected]
) and following steps here - Try OpenTSDB view and query data using steps here
-
Setup a VM running HDP 2.3/Ambari 2.1 build 233 using this repo file
-
Download spark 1.3.1
export HDP_VER=`hdp-select status hadoop-client | sed 's/hadoop-client - \(.*\)/\1/'`
echo "export HDP_VER=$HDP_VER" >> ~/.bashrc
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.3.1-bin-hadoop2.6.tgz
tar -xzvf spark-1.3.1-bin-hadoop2.6.tgz
echo "spark.driver.extraJavaOptions -Dhdp.version=$HDP_VER" >> spark-1.3.1-bin-hadoop2.6/conf/spark-defaults.conf
echo "spark.yarn.am.extraJavaOptions -Dhdp.version=$HDP_VER" >> spark-1.3.1-bin-hadoop2.6/conf/spark-defaults.conf
#copy hbase-site.xml
cp /etc/hbase/conf/hbase-site.xml spark-1.3.1-bin-hadoop2.6/conf/
export YARN_CONF_DIR=/etc/hadoop/conf
echo "export YARN_CONF_DIR=$YARN_CONF_DIR" >> ~/.bashrc
- Remove Hbase maintenance mode
curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Turn off maintenance mode for HBase"}, "Body": {"ServiceInfo": {"maintenance_state": "OFF"}}}' http://localhost:8080/api/v1/clusters/Sandbox/services/HBASE
- Start HBASE
curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Start HBASE via REST"}, "Body": {"ServiceInfo": {"state": "STARTED"}}}' http://localhost:8080/api/v1/clusters/Sandbox/services/HBASE
- Run python code to generate stock price csv
cd
/bin/rm -f prices.csv
/bin/rm -f opentsd.input
wget https://raw.githubusercontent.com/abajwa-hw/opentsdb-service/master/scripts/google_intraday.py
python google_intraday.py AAPL > prices.csv
python google_intraday.py GOOG >> prices.csv
python google_intraday.py HDP >> prices.csv
python google_intraday.py ORCL >> prices.csv
python google_intraday.py MSFT >> prices.csv
#check output files
tail prices.csv opentsd.input
- Create sql file to create phoenix table
vi ~/prices.sql
drop table if exists PRICES;
drop table if exists prices;
create table PRICES (
SYMBOL varchar(10),
DATE varchar(10),
TIME varchar(10),
OPEN varchar(10),
HIGH varchar(10),
LOW varchar(10),
CLOSE varchar(10),
VOLUME varchar(30),
CONSTRAINT pk PRIMARY KEY (SYMBOL, DATE, TIME)
);
- Create phoenix table and populate with csv data
/usr/hdp/2*/phoenix/bin/psql.py sandbox.hortonworks.com:2181:/hbase-unsecure ~/prices.sql ~/prices.csv
- Connect to hbase via phoenix
/usr/hdp/*/phoenix/bin/sqlline.py sandbox.hortonworks.com:2181:/hbase-unsecure
- Run sample query
select * from prices order by DATE, TIME limit 20;
!q
- Start spark shell
export SPARK_CLASSPATH=/etc/hbase/conf:/usr/hdp/2.3.0.0-1754/hbase/lib/hbase-protocol.jar
#start spark shell and pass in relevant jars to classpath
/root/spark-1.3.1-bin-hadoop2.6/bin/spark-shell --driver-memory 512m --executor-memory 512m --conf hdp.version=$HDP_VER --jars \
/usr/hdp/2.3.0.0-1754/phoenix/phoenix-4.4.0.2.3.0.0-1754-client.jar
- Load as an RDD, using a Zookeeper URL. Other examples available here
import org.apache.phoenix.spark._
import org.apache.spark.rdd.RDD
val sqlCtx = new org.apache.spark.sql.SQLContext(sc)
val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
"PRICES", Seq("SYMBOL","DATE","TIME", "OPEN","HIGH","LOW","CLOSE","VOLUME"), zkUrl = Some("localhost:2181:/hbase-unsecure")
)
// count rows
rdd.count()
//get first row
rdd.first()
//get fields from first row
val firstSymbol = rdd.first()("SYMBOL").asInstanceOf[String]
val firstVolume = rdd.first()("VOLUME").asInstanceOf[String]
//print top 100 rows
rdd.take(100).foreach(println)
// print all rows where AAPL volume is above 100100
rdd.filter(row => row("SYMBOL") == "AAPL").filter(row => row("VOLUME").asInstanceOf[String].toInt > 100100).foreach(println)
// print all rows where AAPL low is below 130.0
rdd.filter(row => row("SYMBOL") == "AAPL").filter(row => row("LOW").asInstanceOf[String].toDouble < 130.0).foreach(println)
// avg each row
val avg_each_row = rdd.map(row => ( row("HIGH").asInstanceOf[String].toDouble + row("LOW").asInstanceOf[String].toDouble ) / 2.0 )
avg_each_row.foreach(println)