Skip to content

Instantly share code, notes, and snippets.

@david30907d
Last active August 30, 2017 13:15
Show Gist options
  • Save david30907d/9aa5ae6b8f27fba81bf95ffc86c54a23 to your computer and use it in GitHub Desktop.
Save david30907d/9aa5ae6b8f27fba81bf95ffc86c54a23 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
import json
t=sc.textFile('Rides_0310.csv')
header = t.map(lambda x:x.split(',')).first()
data = t.map(lambda x:x.split(',')).filter(lambda x:'Id' not in x)
dataDict = data.map(lambda x:dict(zip(header, x)))
stop_and_time = dataDict.flatMap(lambda x:((x['BoardStop'], x['BoardTime']), (x['AlightStop'], x['AlightTime'])))
def groupDate(x):
# s = '2015-05-06 07:00:00.000'
x = list(x)
result = []
for i in x:
tmp = {}
for j in map(lambda x:x[:x.rfind(':')][:-1], list(i[1])):
tmp[j] = tmp.setdefault(j, 0) + 1
result.append((i[0], tmp))
return result
stop=stop_and_time.groupByKey().mapPartitions(groupDate).collect()
json.dump(dict(stop), open('zhou.json','w'))
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# @hidden_cell\n",
"# This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.\n",
"# You might want to remove those credentials before you share your notebook.\n",
"def set_hadoop_config_with_credentials_46719cd25b4947838b86f8a11f0314d4(name):\n",
" \"\"\"This function sets the Hadoop configuration so it is possible to\n",
" access data from Bluemix Object Storage using Spark\"\"\"\n",
"\n",
" prefix = 'fs.swift.service.' + name\n",
" hconf = sc._jsc.hadoopConfiguration()\n",
" hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')\n",
" hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')\n",
" hconf.set(prefix + '.tenant', '646b1215221e4e7d8254de7bdf43bf1f')\n",
" hconf.set(prefix + '.username', 'b34d9ea93275426393c59c24493ef000')\n",
" hconf.set(prefix + '.password', 'b.sgybLPA&qc50y7')\n",
" hconf.setInt(prefix + '.http.port', 8080)\n",
" hconf.set(prefix + '.region', 'dallas')\n",
" hconf.setBoolean(prefix + '.public', False)\n",
"\n",
"# you can choose any name\n",
"name = 'keystone'\n",
"set_hadoop_config_with_credentials_46719cd25b4947838b86f8a11f0314d4(name)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"weather_data = sc.textFile(\"swift://DefaultProjectyfannchuedutw.\" + name + \"/pm2.5Taiwan.csv\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"日期 測站 測項 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 \n",
"2015/01/01 龍潭 AMB_TEMP 14 14 14 13 13 13 12 12 13 14 14 14 14 14 13 13 12 11 11 11 11 11 11 11 \n",
"2015/01/01 龍潭 CO 0.69 0.72 0.69 0.64 0.54 0.47 0.45 0.48 0.51 0.54 0.54 0.5 0.47 0.38 0.36 0.35 0.34 0.37 0.34 0.29 0.26 0.22 0.19 0.18 \n",
"2015/01/01 龍潭 NO 0.3 0.1 0.6 2 2 1.9 2.2 3.1 3.7 4.3 4.3 4.5 3.3 4.1 3.1 3.6 3.6 2.8 2.8 2.5 2.2 1.4 2.1 2 \n",
"2015/01/01 龍潭 NO2 11 9.6 8.7 9.1 9.6 9.9 11 13 11 12 12 11 11 9.9 9.9 10 11 13 11 10 8.2 7.3 6.5 5.5 \n",
"2015/01/01 龍潭 NOx 11 9.7 9.3 11 12 12 13 17 15 16 16 16 14 14 13 14 15 16 14 13 10 8.7 8.6 7.5 \n",
"2015/01/01 龍潭 O3 44 43 43 44 41 39 38 34 37 37 39 44 47 49 48 44 39 37 37 39 39 38 38 39 \n",
"2015/01/01 龍潭 PM10 106 138 152 152 143 128 115 106 102 105 108 114 108 96 82 74 72 72 70 60 50 37 38 40 \n",
"2015/01/01 龍潭 PM2.5 46 71 76 74 65 62 56 50 52 56 54 47 40 36 37 27 30 25 26 24 18 16 11 14 \n",
"2015/01/01 龍潭 RAINFALL NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR \n",
"2015/01/01 龍潭 RH 70 69 70 70 71 72 70 69 64 60 57 53 52 52 53 56 59 60 63 62 60 62 65 64 \n",
"2015/01/01 龍潭 SO2 8.6 8.5 7.6 6.7 7.4 6.4 6.2 7.2 6.7 6.5 5.5 4.8 4.1 3.2 2.8 2.8 3.1 2.8 2.5 2 1.6 1.5 1.4 1.2 \n",
"2015/01/01 龍潭 WD_HR 66 70 69 68 67 72 74 72 66 66 63 60 59 62 59 63 64 62 62 62 59 59 59 61 \n",
"2015/01/01 龍潭 WIND_DIREC 67 70 68 67 66 75 75 73 68 64 59 59 65 62 60 68 60 61 62 61 59 63 65 58 \n",
"2015/01/01 龍潭 WIND_SPEED 8.7 7.5 7.7 7.2 6.9 5.3 6.5 6.4 7 7.1 7.5 7.3 6.9 7.4 8 8.3 6.6 7.3 7.1 7 6.9 7.5 6.8 5.8 \n",
"2015/01/01 龍潭 WS_HR 8 7.7 7.2 6.9 7 6.6 6.3 6.2 7.1 7 7.1 7.6 7.5 7.5 7.9 7.9 7.2 6.9 7.2 6.9 6.7 6.9 6.9 5.7 \n",
"2015/01/02 龍潭 AMB_TEMP 11 11 11 11 11 11 11 11 13 14 15 16 16 16 16 16 15 14 13 13 13 12 12 12 \n",
"2015/01/02 龍潭 CO 0.17 0.17 0.18 0.18 0.23 0.24 0.26 0.33 0.34 0.35 0.35 0.34 0.32 0.31 0.3 0.32 0.3 0.34 0.33 0.3 0.28 0.28 0.27 0.26 \n",
"2015/01/02 龍潭 NO 2.2 1.6 1.7 1.8 2 1.9 2.5 3.4 4.5 5 4.6 4.6 4.3 4 4 3.6 3.6 3.1 2.7 2.6 2.9 2.3 1.9 2.2 \n",
"2015/01/02 龍潭 NO2 5 5.4 5.1 5.1 5.6 7 7.5 9.6 9.7 9.5 7.8 7.4 7.1 7.1 7.9 9.2 11 13 12 10 9.2 8.5 8.8 9.2 \n",
"2015/01/02 龍潭 NOx 7.2 7 6.8 6.9 7.6 9 10 13 14 14 12 12 11 11 12 13 14 16 15 13 12 11 11 11 \n",
"2015/01/02 龍潭 O3 40 39 40 39 37 35 35 33 33 33 36 39 39 40 39 38 36 33 34 36 36 36 36 35 \n",
"2015/01/02 龍潭 PM10 40 40 40 47 49 45 49 47 55 50 54 56 59 54 53 47 52 46 44 32 26 28 31 37 \n",
"2015/01/02 龍潭 PM2.5 15 12 9 14 17 20 18 22 21 23 18 25 24 27 18 23 18 19 18 21 23 18 19 19 \n",
"2015/01/02 龍潭 RAINFALL NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR NR \n",
"2015/01/02 龍潭 RH 64 66 67 70 71 71 68 64 61 60 57 56 56 54 54 55 60 64 66 67 68 68 69 69 \n",
"2015/01/02 龍潭 SO2 1.6 1.9 1.9 2.1 2 2.1 1.8 1.8 1.8 1.9 1.7 1.5 1.4 1.5 1.6 1.6 1.5 1.5 1.4 1.3 1.3 1.1 1.1 1.1 \n",
"2015/01/02 龍潭 WD_HR 61 61 54 55 56 56 56 78 74 71 65 60 59 61 63 63 63 87 67 91 89 110 108 100 \n",
"2015/01/02 龍潭 WIND_DIREC 63 61 56 53 55 56 57 117 57 73 66 61 60 56 62 55 70 78 86 101 102 115 103 102 \n",
"2015/01/02 龍潭 WIND_SPEED 5.5 5.8 5.1 4.4 5.2 4.3 4.8 5.1 5.4 6.1 5.7 6.6 5.9 6 6.5 5.7 5.3 5.4 5.6 5.2 5.3 4.8 3.9 3.8 \n"
]
}
],
"source": [
"weather_data_rdd = weather_data.map(lambda line : line.split(\",\"))\n",
"for x in weather_data_rdd.take(30):\n",
" for i in range(len(x)):\n",
" print x[i],\n",
" print \"\""
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[u'\\u65e5\\u671f', u'\\u6e2c\\u7ad9', u'\\u6e2c\\u9805', u'00', u'01', u'02', u'03', u'04', u'05', u'06', u'07', u'08', u'09', u'10', u'11', u'12', u'13', u'14', u'15', u'16', u'17', u'18', u'19', u'20', u'21', u'22', u'23']\n"
]
}
],
"source": [
"pm25schema = weather_data_rdd.first()\n",
"print pm25schema"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import math\n",
"def remove_row_with_noise (x):\n",
" for i in range(3, len(x)):\n",
" if not x[i].isdecimal():\n",
" return False\n",
" return True "
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"PythonRDD[4] at RDD at PythonRDD.scala:48"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"clean_weather_data = weather_data_rdd\\\n",
" .filter(lambda x: x!=pm25schema)\\\n",
" .filter(remove_row_with_noise)\n",
"clean_weather_data.cache()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"dalipm25 = clean_weather_data.filter(lambda x: x[1] == u'大里')"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[Row(date=u'2015/01/01', hr_01=14.0, hr_02=14.0, hr_03=13.0, hr_04=12.0, hr_05=12.0, hr_06=12.0, hr_07=12.0, hr_08=12.0, hr_09=13.0, hr_10=15.0, hr_11=18.0, hr_12=19.0, hr_13=20.0, hr_14=20.0, hr_15=19.0, hr_16=18.0, hr_17=17.0, hr_18=15.0, hr_19=14.0, hr_20=13.0, hr_21=13.0, hr_22=13.0, hr_23=12.0, hr_24=13.0, location=u'\\u5927\\u91cc', measure=u'AMB_TEMP'),\n",
" Row(date=u'2015/01/01', hr_01=33.0, hr_02=32.0, hr_03=24.0, hr_04=24.0, hr_05=24.0, hr_06=23.0, hr_07=19.0, hr_08=18.0, hr_09=27.0, hr_10=25.0, hr_11=16.0, hr_12=13.0, hr_13=12.0, hr_14=14.0, hr_15=14.0, hr_16=14.0, hr_17=18.0, hr_18=21.0, hr_19=24.0, hr_20=21.0, hr_21=23.0, hr_22=23.0, hr_23=20.0, hr_24=17.0, location=u'\\u5927\\u91cc', measure=u'NO2'),\n",
" Row(date=u'2015/01/01', hr_01=61.0, hr_02=61.0, hr_03=45.0, hr_04=45.0, hr_05=42.0, hr_06=46.0, hr_07=51.0, hr_08=50.0, hr_09=59.0, hr_10=39.0, hr_11=20.0, hr_12=16.0, hr_13=15.0, hr_14=17.0, hr_15=16.0, hr_16=17.0, hr_17=20.0, hr_18=23.0, hr_19=26.0, hr_20=23.0, hr_21=24.0, hr_22=25.0, hr_23=21.0, hr_24=19.0, location=u'\\u5927\\u91cc', measure=u'NOx'),\n",
" Row(date=u'2015/01/01', hr_01=53.0, hr_02=55.0, hr_03=58.0, hr_04=53.0, hr_05=43.0, hr_06=36.0, hr_07=35.0, hr_08=42.0, hr_09=55.0, hr_10=64.0, hr_11=65.0, hr_12=59.0, hr_13=52.0, hr_14=44.0, hr_15=47.0, hr_16=41.0, hr_17=43.0, hr_18=40.0, hr_19=42.0, hr_20=35.0, hr_21=28.0, hr_22=20.0, hr_23=18.0, hr_24=16.0, location=u'\\u5927\\u91cc', measure=u'PM2.5'),\n",
" Row(date=u'2015/01/01', hr_01=81.0, hr_02=79.0, hr_03=82.0, hr_04=83.0, hr_05=83.0, hr_06=84.0, hr_07=83.0, hr_08=84.0, hr_09=78.0, hr_10=67.0, hr_11=53.0, hr_12=47.0, hr_13=43.0, hr_14=42.0, hr_15=42.0, hr_16=44.0, hr_17=47.0, hr_18=51.0, hr_19=54.0, hr_20=56.0, hr_21=57.0, hr_22=58.0, hr_23=59.0, hr_24=59.0, location=u'\\u5927\\u91cc', measure=u'RH')]"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from pyspark.sql import SQLContext\n",
"from pyspark.sql import Row\n",
"dalipm25row = dalipm25.map(lambda p:\n",
" Row(\n",
" **dict(zip(pm25schema, p))\n",
" )\n",
")\n",
"\n",
"\n",
"dalipm25row.take(5)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df = sqlContext.createDataFrame(dalipm25row)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------+----------+\n",
"| date|hr_01|hr_02|hr_03|hr_04|hr_05|hr_06|hr_07|hr_08|hr_09|hr_10|hr_11|hr_12|hr_13|hr_14|hr_15|hr_16|hr_17|hr_18|hr_19|hr_20|hr_21|hr_22|hr_23|hr_24|location| measure|\n",
"+----------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------+----------+\n",
"|2015/01/01| 14.0| 14.0| 13.0| 12.0| 12.0| 12.0| 12.0| 12.0| 13.0| 15.0| 18.0| 19.0| 20.0| 20.0| 19.0| 18.0| 17.0| 15.0| 14.0| 13.0| 13.0| 13.0| 12.0| 13.0| 大里| AMB_TEMP|\n",
"|2015/01/01| 33.0| 32.0| 24.0| 24.0| 24.0| 23.0| 19.0| 18.0| 27.0| 25.0| 16.0| 13.0| 12.0| 14.0| 14.0| 14.0| 18.0| 21.0| 24.0| 21.0| 23.0| 23.0| 20.0| 17.0| 大里| NO2|\n",
"|2015/01/01| 61.0| 61.0| 45.0| 45.0| 42.0| 46.0| 51.0| 50.0| 59.0| 39.0| 20.0| 16.0| 15.0| 17.0| 16.0| 17.0| 20.0| 23.0| 26.0| 23.0| 24.0| 25.0| 21.0| 19.0| 大里| NOx|\n",
"|2015/01/01| 53.0| 55.0| 58.0| 53.0| 43.0| 36.0| 35.0| 42.0| 55.0| 64.0| 65.0| 59.0| 52.0| 44.0| 47.0| 41.0| 43.0| 40.0| 42.0| 35.0| 28.0| 20.0| 18.0| 16.0| 大里| PM2.5|\n",
"|2015/01/01| 81.0| 79.0| 82.0| 83.0| 83.0| 84.0| 83.0| 84.0| 78.0| 67.0| 53.0| 47.0| 43.0| 42.0| 42.0| 44.0| 47.0| 51.0| 54.0| 56.0| 57.0| 58.0| 59.0| 59.0| 大里| RH|\n",
"|2015/01/02| 14.0| 12.0| 11.0| 12.0| 10.0| 12.0| 12.0| 12.0| 12.0| 14.0| 19.0| 21.0| 22.0| 20.0| 22.0| 22.0| 20.0| 19.0| 19.0| 18.0| 18.0| 17.0| 16.0| 14.0| 大里| AMB_TEMP|\n",
"|2015/01/02| 17.0| 16.0| 19.0| 21.0| 19.0| 17.0| 19.0| 19.0| 24.0| 32.0| 28.0| 25.0| 23.0| 19.0| 16.0| 18.0| 22.0| 26.0| 35.0| 38.0| 44.0| 41.0| 37.0| 31.0| 大里| NO2|\n",
"|2015/01/02| 19.0| 17.0| 21.0| 23.0| 21.0| 18.0| 23.0| 37.0| 53.0| 71.0| 44.0| 33.0| 28.0| 22.0| 18.0| 19.0| 23.0| 27.0| 36.0| 41.0| 55.0| 68.0| 69.0| 56.0| 大里| NOx|\n",
"|2015/01/02| 32.0| 28.0| 29.0| 35.0| 46.0| 49.0| 40.0| 30.0| 25.0| 39.0| 51.0| 61.0| 61.0| 56.0| 63.0| 72.0| 83.0| 91.0| 89.0| 81.0| 72.0| 71.0| 84.0| 91.0| 大里| PM10|\n",
"|2015/01/02| 21.0| 22.0| 26.0| 23.0| 20.0| 18.0| 15.0| 21.0| 21.0| 25.0| 29.0| 32.0| 34.0| 29.0| 32.0| 39.0| 51.0| 51.0| 47.0| 43.0| 43.0| 48.0| 47.0| 53.0| 大里| PM2.5|\n",
"|2015/01/02| 63.0| 63.0| 67.0| 71.0| 73.0| 74.0| 73.0| 75.0| 71.0| 64.0| 56.0| 52.0| 50.0| 49.0| 48.0| 49.0| 50.0| 54.0| 56.0| 58.0| 60.0| 67.0| 70.0| 74.0| 大里| RH|\n",
"|2015/01/02| 51.0|241.0|168.0|161.0|167.0|163.0|152.0|251.0|160.0|269.0|335.0|307.0|346.0|111.0|279.0|262.0|313.0|314.0|309.0|358.0| 34.0|163.0|177.0|201.0| 大里|WIND_DIREC|\n",
"|2015/01/03| 15.0| 15.0| 14.0| 13.0| 13.0| 14.0| 15.0| 16.0| 19.0| 20.0| 23.0| 25.0| 28.0| 29.0| 29.0| 26.0| 22.0| 19.0| 18.0| 18.0| 21.0| 19.0| 20.0| 19.0| 大里| AMB_TEMP|\n",
"|2015/01/03| 31.0| 31.0| 31.0| 28.0| 28.0| 26.0| 22.0| 22.0| 29.0| 36.0| 45.0| 40.0| 25.0| 21.0| 24.0| 23.0| 28.0| 38.0| 38.0| 49.0| 42.0| 38.0| 36.0| 33.0| 大里| NO2|\n",
"|2015/01/03| 49.0| 46.0| 47.0| 47.0| 48.0| 47.0| 43.0| 51.0| 64.0| 65.0| 65.0| 50.0| 30.0| 25.0| 27.0| 26.0| 31.0| 40.0| 40.0| 55.0| 44.0| 46.0| 42.0| 37.0| 大里| NOx|\n",
"|2015/01/03| 96.0| 87.0| 78.0| 70.0| 65.0| 61.0| 54.0| 55.0| 65.0| 83.0| 96.0| 97.0|100.0|107.0|109.0|109.0|104.0|103.0|105.0|108.0|123.0|126.0|123.0|103.0| 大里| PM10|\n",
"|2015/01/03| 48.0| 48.0| 43.0| 38.0| 37.0| 36.0| 37.0| 34.0| 37.0| 46.0| 64.0| 77.0| 83.0| 75.0| 68.0| 69.0| 64.0| 65.0| 59.0| 66.0| 71.0| 66.0| 57.0| 48.0| 大里| PM2.5|\n",
"|2015/01/03| 73.0| 72.0| 73.0| 74.0| 75.0| 77.0| 80.0| 79.0| 75.0| 69.0| 62.0| 56.0| 52.0| 49.0| 50.0| 52.0| 53.0| 56.0| 58.0| 62.0| 64.0| 68.0| 71.0| 73.0| 大里| RH|\n",
"|2015/01/03|167.0| 96.0|189.0|241.0|189.0|168.0|167.0|271.0|272.0|270.0|282.0|219.0|244.0|271.0|263.0|273.0|283.0|342.0|346.0| 87.0| 41.0|104.0| 94.0|100.0| 大里| WD_HR|\n",
"|2015/01/03|123.0|208.0|231.0|239.0|151.0|169.0|215.0| 83.0|288.0|276.0| 18.0|244.0|268.0|291.0|265.0|279.0|318.0| 55.0| 29.0| 77.0| 34.0|101.0| 94.0| 96.0| 大里|WIND_DIREC|\n",
"+----------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------+----------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df_pm10 = df.select(df.date.alias(\"datepm10\"),df.hr_09.alias(\"hr_09_pm10\"),df.hr_10.alias(\"hr_10_pm10\"), df.hr_11.alias(\"hr_11_pm10\"),df.hr_12.alias(\"hr_12_pm10\"), \"measure\").filter(df.measure==\"PM10\")\n",
"df_pm25 = df.select(df.date.alias(\"datepm25\"),df.hr_09.alias(\"hr_09_pm25\"),df.hr_10.alias(\"hr_10_pm25\"), df.hr_11.alias(\"hr_11_pm25\"),df.hr_12.alias(\"hr_12_pm25\"), \"measure\").filter(df.measure==\"PM2.5\") \n",
"df_AMB_TEMP = df.select(df.date.alias(\"dateAMB_TEMP\"),df.hr_09.alias(\"hr_09_AMB_TEMP\"),df.hr_10.alias(\"hr_10_AMB_TEMP\"), df.hr_11.alias(\"hr_11_AMB_TEMP\"),df.hr_12.alias(\"hr_12_AMB_TEMP\"), \"measure\").filter(df.measure==\"AMB_TEMP\")"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"traing_data = df_pm25\\\n",
" .join(df_pm10, df_pm25.datepm25==df_pm10.datepm10)\\\n",
" .join(df_AMB_TEMP, df_pm25.datepm25==df_AMB_TEMP.dateAMB_TEMP).drop(\"dateAMB_TEMP\").drop(\"measure\").drop(\"datepm10\")"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+--------------+\n",
"| datepm25|hr_09_pm25|hr_10_pm25|hr_11_pm25|hr_12_pm25|hr_09_pm10|hr_10_pm10|hr_11_pm10|hr_12_pm10|hr_09_AMB_TEMP|hr_10_AMB_TEMP|hr_11_AMB_TEMP|hr_12_AMB_TEMP|\n",
"+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+--------------+\n",
"|2015/08/22| 42.0| 33.0| 41.0| 53.0| 61.0| 71.0| 76.0| 89.0| 30.0| 32.0| 33.0| 33.0|\n",
"|2015/11/24| 13.0| 14.0| 15.0| 12.0| 30.0| 39.0| 36.0| 39.0| 26.0| 26.0| 27.0| 28.0|\n",
"|2015/03/01| 7.0| 8.0| 17.0| 13.0| 15.0| 19.0| 15.0| 30.0| 20.0| 21.0| 24.0| 22.0|\n",
"|2015/12/23| 43.0| 52.0| 59.0| 44.0| 79.0| 82.0| 87.0| 102.0| 21.0| 22.0| 24.0| 26.0|\n",
"|2015/04/30| 55.0| 47.0| 43.0| 41.0| 86.0| 86.0| 78.0| 83.0| 27.0| 30.0| 32.0| 33.0|\n",
"|2015/07/15| 38.0| 39.0| 44.0| 47.0| 46.0| 45.0| 58.0| 64.0| 29.0| 30.0| 31.0| 32.0|\n",
"|2015/01/15| 13.0| 16.0| 14.0| 8.0| 23.0| 35.0| 43.0| 35.0| 18.0| 20.0| 21.0| 23.0|\n",
"|2015/04/18| 44.0| 39.0| 32.0| 20.0| 94.0| 88.0| 83.0| 69.0| 28.0| 30.0| 31.0| 32.0|\n",
"|2015/04/24| 26.0| 28.0| 35.0| 39.0| 68.0| 78.0| 83.0| 76.0| 25.0| 26.0| 27.0| 28.0|\n",
"|2015/01/09| 18.0| 20.0| 14.0| 12.0| 28.0| 32.0| 43.0| 54.0| 16.0| 17.0| 18.0| 20.0|\n",
"|2015/01/06| 42.0| 47.0| 38.0| 24.0| 40.0| 49.0| 61.0| 56.0| 21.0| 25.0| 29.0| 31.0|\n",
"|2015/10/19| 31.0| 43.0| 53.0| 44.0| 42.0| 56.0| 71.0| 88.0| 25.0| 27.0| 28.0| 29.0|\n",
"|2015/01/19| 18.0| 30.0| 26.0| 28.0| 33.0| 45.0| 59.0| 61.0| 14.0| 16.0| 19.0| 20.0|\n",
"|2015/02/03| 26.0| 31.0| 28.0| 20.0| 70.0| 61.0| 66.0| 62.0| 19.0| 20.0| 21.0| 21.0|\n",
"|2015/04/20| 15.0| 17.0| 23.0| 21.0| 18.0| 25.0| 30.0| 43.0| 28.0| 30.0| 31.0| 31.0|\n",
"|2015/03/10| 23.0| 21.0| 26.0| 24.0| 38.0| 47.0| 60.0| 63.0| 17.0| 17.0| 17.0| 18.0|\n",
"|2015/01/02| 21.0| 25.0| 29.0| 32.0| 25.0| 39.0| 51.0| 61.0| 12.0| 14.0| 19.0| 21.0|\n",
"|2015/01/21| 46.0| 36.0| 33.0| 33.0| 101.0| 109.0| 98.0| 83.0| 17.0| 18.0| 19.0| 20.0|\n",
"|2015/12/08| 13.0| 14.0| 18.0| 14.0| 16.0| 18.0| 27.0| 51.0| 21.0| 22.0| 23.0| 24.0|\n",
"|2015/03/16| 60.0| 70.0| 73.0| 65.0| 89.0| 98.0| 116.0| 124.0| 22.0| 23.0| 24.0| 26.0|\n",
"+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+--------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"traing_data.show()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0.9423851890511278"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"traing_data.corr(\"hr_09_pm25\", \"hr_10_pm25\")"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+--------------+\n",
"| datepm25|hr_09_pm25|hr_10_pm25|hr_11_pm25|hr_12_pm25|hr_09_pm10|hr_10_pm10|hr_11_pm10|hr_12_pm10|hr_09_AMB_TEMP|hr_10_AMB_TEMP|hr_11_AMB_TEMP|hr_12_AMB_TEMP|\n",
"+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+--------------+\n",
"|2015/08/22| 42.0| 33.0| 41.0| 53.0| 61.0| 71.0| 76.0| 89.0| 30.0| 32.0| 33.0| 33.0|\n",
"|2015/11/24| 13.0| 14.0| 15.0| 12.0| 30.0| 39.0| 36.0| 39.0| 26.0| 26.0| 27.0| 28.0|\n",
"|2015/03/01| 7.0| 8.0| 17.0| 13.0| 15.0| 19.0| 15.0| 30.0| 20.0| 21.0| 24.0| 22.0|\n",
"|2015/12/23| 43.0| 52.0| 59.0| 44.0| 79.0| 82.0| 87.0| 102.0| 21.0| 22.0| 24.0| 26.0|\n",
"|2015/04/30| 55.0| 47.0| 43.0| 41.0| 86.0| 86.0| 78.0| 83.0| 27.0| 30.0| 32.0| 33.0|\n",
"|2015/07/15| 38.0| 39.0| 44.0| 47.0| 46.0| 45.0| 58.0| 64.0| 29.0| 30.0| 31.0| 32.0|\n",
"|2015/01/15| 13.0| 16.0| 14.0| 8.0| 23.0| 35.0| 43.0| 35.0| 18.0| 20.0| 21.0| 23.0|\n",
"|2015/04/18| 44.0| 39.0| 32.0| 20.0| 94.0| 88.0| 83.0| 69.0| 28.0| 30.0| 31.0| 32.0|\n",
"|2015/04/24| 26.0| 28.0| 35.0| 39.0| 68.0| 78.0| 83.0| 76.0| 25.0| 26.0| 27.0| 28.0|\n",
"|2015/01/09| 18.0| 20.0| 14.0| 12.0| 28.0| 32.0| 43.0| 54.0| 16.0| 17.0| 18.0| 20.0|\n",
"|2015/01/06| 42.0| 47.0| 38.0| 24.0| 40.0| 49.0| 61.0| 56.0| 21.0| 25.0| 29.0| 31.0|\n",
"|2015/10/19| 31.0| 43.0| 53.0| 44.0| 42.0| 56.0| 71.0| 88.0| 25.0| 27.0| 28.0| 29.0|\n",
"|2015/01/19| 18.0| 30.0| 26.0| 28.0| 33.0| 45.0| 59.0| 61.0| 14.0| 16.0| 19.0| 20.0|\n",
"|2015/02/03| 26.0| 31.0| 28.0| 20.0| 70.0| 61.0| 66.0| 62.0| 19.0| 20.0| 21.0| 21.0|\n",
"|2015/04/20| 15.0| 17.0| 23.0| 21.0| 18.0| 25.0| 30.0| 43.0| 28.0| 30.0| 31.0| 31.0|\n",
"|2015/03/10| 23.0| 21.0| 26.0| 24.0| 38.0| 47.0| 60.0| 63.0| 17.0| 17.0| 17.0| 18.0|\n",
"|2015/01/02| 21.0| 25.0| 29.0| 32.0| 25.0| 39.0| 51.0| 61.0| 12.0| 14.0| 19.0| 21.0|\n",
"|2015/01/21| 46.0| 36.0| 33.0| 33.0| 101.0| 109.0| 98.0| 83.0| 17.0| 18.0| 19.0| 20.0|\n",
"|2015/12/08| 13.0| 14.0| 18.0| 14.0| 16.0| 18.0| 27.0| 51.0| 21.0| 22.0| 23.0| 24.0|\n",
"|2015/03/16| 60.0| 70.0| 73.0| 65.0| 89.0| 98.0| 116.0| 124.0| 22.0| 23.0| 24.0| 26.0|\n",
"+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+--------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"traing_data.show()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from pyspark.sql import functions as F\n",
"formulated_traning_data = traing_data.select(\"*\", F.when(traing_data.hr_12_pm25 > 50, 1).otherwise(0)).withColumnRenamed(\"CASE WHEN (hr_12_pm25 > 50) THEN 1 ELSE 0 END\", \"pm25_condiction\").\\\n",
"drop(\"hr_12_AMB_TEMP\").drop(\"hr_12_pm10\").drop(\"hr_12_pm25\").drop(\"datepm25\")"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+---------------+\n",
"|hr_09_pm25|hr_10_pm25|hr_11_pm25|hr_09_pm10|hr_10_pm10|hr_11_pm10|hr_09_AMB_TEMP|hr_10_AMB_TEMP|hr_11_AMB_TEMP|pm25_condiction|\n",
"+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+---------------+\n",
"| 42.0| 33.0| 41.0| 61.0| 71.0| 76.0| 30.0| 32.0| 33.0| 1|\n",
"| 13.0| 14.0| 15.0| 30.0| 39.0| 36.0| 26.0| 26.0| 27.0| 0|\n",
"| 7.0| 8.0| 17.0| 15.0| 19.0| 15.0| 20.0| 21.0| 24.0| 0|\n",
"| 43.0| 52.0| 59.0| 79.0| 82.0| 87.0| 21.0| 22.0| 24.0| 0|\n",
"| 55.0| 47.0| 43.0| 86.0| 86.0| 78.0| 27.0| 30.0| 32.0| 0|\n",
"| 38.0| 39.0| 44.0| 46.0| 45.0| 58.0| 29.0| 30.0| 31.0| 0|\n",
"| 13.0| 16.0| 14.0| 23.0| 35.0| 43.0| 18.0| 20.0| 21.0| 0|\n",
"| 44.0| 39.0| 32.0| 94.0| 88.0| 83.0| 28.0| 30.0| 31.0| 0|\n",
"| 26.0| 28.0| 35.0| 68.0| 78.0| 83.0| 25.0| 26.0| 27.0| 0|\n",
"| 18.0| 20.0| 14.0| 28.0| 32.0| 43.0| 16.0| 17.0| 18.0| 0|\n",
"| 42.0| 47.0| 38.0| 40.0| 49.0| 61.0| 21.0| 25.0| 29.0| 0|\n",
"| 31.0| 43.0| 53.0| 42.0| 56.0| 71.0| 25.0| 27.0| 28.0| 0|\n",
"| 18.0| 30.0| 26.0| 33.0| 45.0| 59.0| 14.0| 16.0| 19.0| 0|\n",
"| 26.0| 31.0| 28.0| 70.0| 61.0| 66.0| 19.0| 20.0| 21.0| 0|\n",
"| 15.0| 17.0| 23.0| 18.0| 25.0| 30.0| 28.0| 30.0| 31.0| 0|\n",
"| 23.0| 21.0| 26.0| 38.0| 47.0| 60.0| 17.0| 17.0| 17.0| 0|\n",
"| 21.0| 25.0| 29.0| 25.0| 39.0| 51.0| 12.0| 14.0| 19.0| 0|\n",
"| 46.0| 36.0| 33.0| 101.0| 109.0| 98.0| 17.0| 18.0| 19.0| 0|\n",
"| 13.0| 14.0| 18.0| 16.0| 18.0| 27.0| 21.0| 22.0| 23.0| 0|\n",
"| 60.0| 70.0| 73.0| 89.0| 98.0| 116.0| 22.0| 23.0| 24.0| 1|\n",
"+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+---------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"formulated_traning_data.show()"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from pyspark.mllib.regression import LabeledPoint\n",
"\n",
"LabelPoints = formulated_traning_data.rdd\\\n",
" .map(lambda r: LabeledPoint(r.pm25_condiction, \\\n",
" [r.hr_09_pm25, r.hr_10_pm25, r.hr_11_pm25, \\\n",
" r.hr_09_pm10, r.hr_10_pm10, r.hr_11_pm10, \\\n",
" r.hr_09_AMB_TEMP, r.hr_10_AMB_TEMP, r.hr_11_AMB_TEMP]))\n"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"(trainData,validationData,testData) = LabelPoints.randomSplit([6,0,4])"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from pyspark.mllib.tree import DecisionTree\n",
"DTModel = DecisionTree.trainClassifier(trainData,\n",
" numClasses=2,\n",
" categoricalFeaturesInfo={},\n",
" impurity=\"entropy\",\n",
" maxDepth=3,\n",
" maxBins=5)"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"prediction = DTModel.predict(testData.map(lambda x: x.features))"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"predictionAndLabels = prediction.zip(testData.map(lambda x: x.label))"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[(0.0, 1.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 1.0),\n",
" (0.0, 0.0),\n",
" (1.0, 0.0),\n",
" (1.0, 1.0),\n",
" (1.0, 1.0),\n",
" (1.0, 1.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 1.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 1.0),\n",
" (1.0, 1.0),\n",
" (1.0, 1.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (1.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (1.0, 0.0),\n",
" (0.0, 1.0),\n",
" (1.0, 1.0),\n",
" (0.0, 0.0),\n",
" (1.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0),\n",
" (0.0, 0.0)]"
]
},
"execution_count": 33,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"predictionAndLabels.collect()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from pyspark.mllib.evaluation import BinaryClassificationMetrics\n",
"metrics = BinaryClassificationMetrics(predictionAndLabels)"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.580316742081\n"
]
}
],
"source": [
"print metrics.areaUnderROC"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2 with Spark 2.0",
"language": "python",
"name": "python2-spark20"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment