Skip to content

Instantly share code, notes, and snippets.

@dbist
Last active July 24, 2018 15:32
Show Gist options
  • Select an option

  • Save dbist/c28c9af7398017a3a5de32e017efa250 to your computer and use it in GitHub Desktop.

Select an option

Save dbist/c28c9af7398017a3a5de32e017efa250 to your computer and use it in GitHub Desktop.
airflow-usecase
hdfs dfs -mkdir /tmp/data
hdfs dfs -chmod -R 777 /tmp/data
hdfs dfs -mv /tmp/data.csv /tmp/data
//hive
!connect jdbc:hive2://aervits-hdp1:10000 "" ""
CREATE EXTERNAL TABLE IF NOT EXISTS traffic_csv (
`DT` STRING,
`TM` STRING,
`BOROUGH` STRING,
`ZIP CODE` STRING,
`LATITUDE` STRING,
`LONGITUDE` STRING,
`LOCATION` STRING,
`ON STREET NAME` STRING,
`CROSS STREET NAME` STRING,
`OFF STREET NAME` STRING,
`NUMBER OF PERSONS INJURED` STRING,
`NUMBER OF PERSONS KILLED` STRING,
`NUMBER OF PEDESTRIANS INJURED` STRING,
`NUMBER OF PEDESTRIANS KILLED` STRING,
`NUMBER OF CYCLIST INJURED` STRING,
`NUMBER OF CYCLIST KILLED` STRING,
`NUMBER OF MOTORIST INJURED` STRING,
`NUMBER OF MOTORIST KILLED` STRING,
`CONTRIBUTING FACTOR VEHICLE 1` STRING,
`CONTRIBUTING FACTOR VEHICLE 2` STRING,
`CONTRIBUTING FACTOR VEHICLE 3` STRING,
`CONTRIBUTING FACTOR VEHICLE 4` STRING,
`CONTRIBUTING FACTOR VEHICLE 5` STRING,
`UNIQUE KEY` STRING,
`VEHICLE TYPE CODE 1` STRING,
`VEHICLE TYPE CODE 2` STRING,
`VEHICLE TYPE CODE 3` STRING,
`VEHICLE TYPE CODE 4` STRING,
`VEHICLE TYPE CODE 5` STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/tmp/data';
CREATE TABLE IF NOT EXISTS traffic (
`DT` STRING,
`TM` STRING,
`ZIP CODE` STRING,
`LATITUDE` STRING,
`LONGITUDE` STRING,
`LOCATION` STRING,
`ON STREET NAME` STRING,
`CROSS STREET NAME` STRING,
`OFF STREET NAME` STRING,
`NUMBER OF PERSONS INJURED` STRING,
`NUMBER OF PERSONS KILLED` STRING,
`NUMBER OF PEDESTRIANS INJURED` STRING,
`NUMBER OF PEDESTRIANS KILLED` STRING,
`NUMBER OF CYCLIST INJURED` STRING,
`NUMBER OF CYCLIST KILLED` STRING,
`NUMBER OF MOTORIST INJURED` STRING,
`NUMBER OF MOTORIST KILLED` STRING,
`CONTRIBUTING FACTOR VEHICLE 1` STRING,
`CONTRIBUTING FACTOR VEHICLE 2` STRING,
`CONTRIBUTING FACTOR VEHICLE 3` STRING,
`CONTRIBUTING FACTOR VEHICLE 4` STRING,
`CONTRIBUTING FACTOR VEHICLE 5` STRING,
`UNIQUE KEY` STRING,
`VEHICLE TYPE CODE 1` STRING,
`VEHICLE TYPE CODE 2` STRING,
`VEHICLE TYPE CODE 3` STRING,
`VEHICLE TYPE CODE 4` STRING,
`VEHICLE TYPE CODE 5` STRING)
PARTITIONED BY (BOROUGH STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS ORC;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE TRAFFIC PARTITION (BOROUGH)
SELECT `DT`,
`TM`,
`ZIP CODE`,
`LATITUDE`,
`LONGITUDE`,
`LOCATION`,
`ON STREET NAME`,
`CROSS STREET NAME`,
`OFF STREET NAME`,
`NUMBER OF PERSONS INJURED`,
`NUMBER OF PERSONS KILLED`,
`NUMBER OF PEDESTRIANS INJURED`,
`NUMBER OF PEDESTRIANS KILLED`,
`NUMBER OF CYCLIST INJURED`,
`NUMBER OF CYCLIST KILLED`,
`NUMBER OF MOTORIST INJURED`,
`NUMBER OF MOTORIST KILLED`,
`CONTRIBUTING FACTOR VEHICLE 1`,
`CONTRIBUTING FACTOR VEHICLE 2`,
`CONTRIBUTING FACTOR VEHICLE 3`,
`CONTRIBUTING FACTOR VEHICLE 4`,
`CONTRIBUTING FACTOR VEHICLE 5`,
`UNIQUE KEY`,
`VEHICLE TYPE CODE 1`,
`VEHICLE TYPE CODE 2`,
`VEHICLE TYPE CODE 3`,
`VEHICLE TYPE CODE 4`,
`VEHICLE TYPE CODE 5`,
`BOROUGH`
FROM TRAFFIC_CSV;
ALTER TABLE TRAFFIC DROP PARTITION (BOROUGH='__HIVE_DEFAULT_PARTITION__');
ALTER TABLE TRAFFIC DROP PARTITION (BOROUGH=0);
select count(dt), tm, borough from traffic group by tm, borough;
//pyspark
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('hdfs://aervits-hdp0:8020/apps/hive')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
//spark.sql("SELECT COUNT(DT), TM, BOROUGH FROM TRAFFIC group by TM, BOROUGH").show()
sqlDf = spark.sql("SELECT COUNT(DT), TM, BOROUGH FROM TRAFFIC group by TM, BOROUGH")
sqlDf.createOrReplaceTempView("trafficDf")
spark.sql("DROP TABLE IF EXISTS trafficDf")
spark.sql("CREATE TABLE trafficDF stored as ORC AS SELECT * FROM trafficDf")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment