Created
December 3, 2020 09:19
-
-
Save gumdropsteve/decc503c4eeea9c4ac363f5b95457045 to your computer and use it in GitHub Desktop.
run this notebook on https://app.blazingsql.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"#### **Download Data**" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import os\n", | |
"import urllib" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# tag data dir & sub dirs\n", | |
"data_dir = 'raw_data/'\n", | |
"taxi_sub_dir = 'taxi/'\n", | |
"uber_sub_dir = 'uber/'\n", | |
"t_zones_sub_dir = 'zones/'\n", | |
"\n", | |
"# create directories if they don't exist\n", | |
"for d in [data_dir, f'{data_dir}/{taxi_sub_dir}', f'{data_dir}/{uber_sub_dir}', f'{data_dir}/{t_zones_sub_dir}']:\n", | |
" if not os.path.exists(d):\n", | |
" os.system(f'mkdir {d}')\n", | |
" \n", | |
"# (raw) url to directory we're downloading\n", | |
"base_url = 'https://raw.githubusercontent.com/gumdropsteve/datasets/master/nyc_transport/'" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# download green taxi\n", | |
"for file in [f'green_tripdata_{f}.csv' for f in ['2013-08', '2014-01', '2015-01', '2016-01', '2017-01', '2018-01', '2019-01']]:\n", | |
" if not os.path.isfile(data_dir + taxi_sub_dir + file):\n", | |
" print(f'Downloading {base_url + taxi_sub_dir + file} to {data_dir + taxi_sub_dir + file}')\n", | |
" urllib.request.urlretrieve(base_url + taxi_sub_dir + file, data_dir + taxi_sub_dir + file)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# download yellow taxi\n", | |
"for file in [f'yellow_tripdata_20{f}-01.csv' for f in ['09', 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]:\n", | |
" if not os.path.isfile(data_dir + taxi_sub_dir + file):\n", | |
" print(f'Downloading {base_url + taxi_sub_dir + file} to {data_dir + taxi_sub_dir + file}')\n", | |
" urllib.request.urlretrieve(base_url + taxi_sub_dir + file, data_dir + taxi_sub_dir + file)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# download uber taxi (2014)\n", | |
"for file in [f'uber-raw-data-{month}14.csv' for month in ['apr', 'aug', 'jul', 'jun', 'may', 'sep']]:\n", | |
" if not os.path.isfile(data_dir + uber_sub_dir + file):\n", | |
" print(f'Downloading {base_url + uber_sub_dir + file} to {data_dir + uber_sub_dir + file}')\n", | |
" urllib.request.urlretrieve(base_url + uber_sub_dir + file, data_dir + uber_sub_dir + file)\n", | |
" \n", | |
"# download uber taxi (2015)\n", | |
"file = 'uber-raw-data-janjune-15.csv'\n", | |
"if not os.path.isfile(data_dir + uber_sub_dir + file):\n", | |
" print(f'Downloading {base_url + uber_sub_dir + file} to {data_dir + uber_sub_dir + file}')\n", | |
" urllib.request.urlretrieve(base_url + uber_sub_dir + file, data_dir + uber_sub_dir + file)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# download taxi zones shapefile(s)\n", | |
"for file in [f'cu_taxi_zones.{f}' for f in ['cpg', 'dbf', 'prj', 'shp', 'shx']]:\n", | |
" if not os.path.isfile(data_dir + t_zones_sub_dir + file):\n", | |
" print(uber_sub_dir + data_dir + file)\n", | |
" print(f'Downloading {base_url + t_zones_sub_dir + file} to {data_dir + t_zones_sub_dir + file}')\n", | |
" urllib.request.urlretrieve(base_url + t_zones_sub_dir + file, data_dir + t_zones_sub_dir + file)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Convert Taxi to Parquet" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import os\n", | |
"import dask_cudf\n", | |
"import cuspatial\n", | |
"import numpy as np" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"dtype_list = {'dropoff_datetime': 'str', \n", | |
" 'dropoff_latitude': 'float64',\n", | |
" 'dropoff_taxizone_id': 'float64',\n", | |
" 'dropoff_longitude': 'float64',\n", | |
" 'ehail_fee': 'float64',\n", | |
" 'extra': 'float64',\n", | |
" 'fare_amount': 'float64',\n", | |
" 'improvement_surcharge': 'float64',\n", | |
" 'junk1': 'str',\n", | |
" 'junk2': 'str',\n", | |
" 'mta_tax': 'float64',\n", | |
" 'passenger_count': 'str', \n", | |
" 'payment_type': 'str', \n", | |
" 'pickup_datetime': 'str', \n", | |
" 'pickup_latitude': 'float64',\n", | |
" 'pickup_taxizone_id': 'float64',\n", | |
" 'pickup_longitude': 'float64',\n", | |
" 'rate_code_id': 'str', \n", | |
" 'store_and_fwd_flag': 'str', \n", | |
" 'tip_amount': 'float64',\n", | |
" 'tolls_amount': 'float64',\n", | |
" 'total_amount': 'float64',\n", | |
" 'trip_distance': 'float64',\n", | |
" 'trip_type': 'str', \n", | |
" 'vendor_id': 'str', \n", | |
" }\n", | |
"\n", | |
"# make dict of paths to data directories\n", | |
"relative_path = 'raw_data'\n", | |
"config = {'citibike_raw_data_path': f'{relative_path}/bike/',\n", | |
" 'taxi_raw_data_path': f'{relative_path}/taxi/',\n", | |
" 'uber_raw_data_path': f'{relative_path}/uber/',\n", | |
" 'subway_raw_data_path': f'{relative_path}/subway/',\n", | |
" 'parquet_output_path': f'data/'\n", | |
" }\n", | |
"\n", | |
"def glob(x):\n", | |
" '''\n", | |
" Signature: sorted(glob(pathname=x, *, recursive=False))\n", | |
" Docstring:\n", | |
" Return a list of paths matching a pathname pattern.\n", | |
"\n", | |
" The pattern may contain simple shell-style wildcards a la\n", | |
" fnmatch. However, unlike fnmatch, filenames starting with a\n", | |
" dot are special cases that are not matched by '*' and '?'\n", | |
" patterns.\n", | |
"\n", | |
" If recursive is true, the pattern '**' will match any files and\n", | |
" zero or more directories and subdirectories.\n", | |
" '''\n", | |
" from glob import glob\n", | |
" return sorted(glob(x))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def get_green():\n", | |
" green_schema_pre_2015 = \"vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,total_amount,payment_type,trip_type,junk1,junk2\"\n", | |
" green_glob_pre_2015 = glob(\n", | |
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_201[34]*.csv'))\n", | |
"\n", | |
" green_schema_2015_h1 = \"vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,junk1,junk2\"\n", | |
" green_glob_2015_h1 = glob(\n", | |
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2015-0[1-6].csv'))\n", | |
"\n", | |
" green_schema_2015_h2_2016_h1 = \"vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type\"\n", | |
" green_glob_2015_h2_2016_h1 = glob(os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2015-0[7-9].csv')) + glob(\n", | |
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2015-1[0-2].csv')) + glob(\n", | |
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2016-0[1-6].csv'))\n", | |
" \n", | |
" green_schema_2016_h2_plus = \"vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_taxizone_id,dropoff_taxizone_id,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,junk1,junk2\"\n", | |
" green_glob_2016_h2_plus = glob(os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2016-0[7-9].csv')) + glob(\n", | |
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2016-1[0-2].csv')) + glob(\n", | |
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_201[7-9]*.csv'))\n", | |
"\n", | |
" # before 2015 dataframe\n", | |
" green1 = dask_cudf.read_csv(green_glob_pre_2015, \n", | |
" header=0,\n", | |
" na_values=[\"NA\"],\n", | |
" parse_dates=[1, 2],\n", | |
" infer_datetime_format=True,\n", | |
" dtype=dtype_list,\n", | |
" names=green_schema_pre_2015.split(','))\n", | |
" green1['dropoff_taxizone_id'] = -1.0\n", | |
" green1['pickup_taxizone_id'] = -1.0\n", | |
" green1['improvement_surcharge'] = np.nan\n", | |
" green1 = green1.drop(['junk1', 'junk2'], axis=1)\n", | |
"\n", | |
" # january 2015 - june 2015 dataframe\n", | |
" green2 = dask_cudf.read_csv(green_glob_2015_h1, \n", | |
" header=0,\n", | |
" na_values=[\"NA\"],\n", | |
" parse_dates=[1, 2],\n", | |
" infer_datetime_format=True,\n", | |
" dtype=dtype_list,\n", | |
" names=green_schema_2015_h1.split(','))\n", | |
" green2['dropoff_taxizone_id'] = -1.0\n", | |
" green2['pickup_taxizone_id'] = -1.0\n", | |
" green2 = green2.drop(['junk1', 'junk2'], axis=1)\n", | |
"\n", | |
" # july 2015 - june 2016 dataframe\n", | |
" green3 = dask_cudf.read_csv(green_glob_2015_h2_2016_h1, \n", | |
" header=0,\n", | |
" na_values=[\"NA\"],\n", | |
" parse_dates=[1, 2],\n", | |
" infer_datetime_format=True,\n", | |
" dtype=dtype_list,\n", | |
" names=green_schema_2015_h2_2016_h1.split(','))\n", | |
" green3['dropoff_taxizone_id'] = -1.0\n", | |
" green3['pickup_taxizone_id'] = -1.0\n", | |
"\n", | |
" # july 2016 or later dataframe\n", | |
" green4 = dask_cudf.read_csv(green_glob_2016_h2_plus, \n", | |
" header=0,\n", | |
" na_values=[\"NA\"],\n", | |
" parse_dates=[1, 2],\n", | |
" infer_datetime_format=True,\n", | |
" dtype=dtype_list,\n", | |
" names=green_schema_2016_h2_plus.split(','))\n", | |
" green4['dropoff_latitude'] = 0.0\n", | |
" green4['dropoff_longitude'] = 0.0\n", | |
" green4['pickup_latitude'] = 0.0\n", | |
" green4['pickup_longitude'] = 0.0\n", | |
" green4 = green4.drop(['junk1', 'junk2'], axis=1)\n", | |
"\n", | |
" # combine dataframes\n", | |
" green = dask_cudf.concat([green1[sorted(green1.columns)],\n", | |
" green2[sorted(green1.columns)],\n", | |
" green3[sorted(green1.columns)],\n", | |
" green4[sorted(green1.columns)]]\n", | |
" )\n", | |
" for field in list(green.columns):\n", | |
" if field in dtype_list:\n", | |
" green[field] = green[field].astype(dtype_list[field])\n", | |
"\n", | |
" green['trip_type'] = 'green'\n", | |
"\n", | |
" return green" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def get_yellow():\n", | |
" # tag file paths to data and column names by schema (x < 2015, 2015 <= x <= 2016.5, 2016.5 < x)\n", | |
" yellow_schema_pre_2015 = \"vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,total_amount\"\n", | |
" yellow_glob_pre_2015 = glob(os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_201[0-4]*.csv')) + glob(\n", | |
" os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2009*.csv'))\n", | |
" yellow_schema_2015_2016_h1 = \"vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount\"\n", | |
" yellow_glob_2015_2016_h1 = glob(os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2015*.csv')) + glob(\n", | |
" os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2016-0[1-6].csv'))\n", | |
" yellow_schema_2016_h2_plus = \"vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_taxizone_id,dropoff_taxizone_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,junk1,junk2\"\n", | |
" yellow_glob_2016_h2_plus = glob(os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2016-0[7-9].csv')) + glob(\n", | |
" os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2016-1[0-2].csv')) + glob(\n", | |
" os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_201[7-9]*.csv'))\n", | |
"\n", | |
" # create pre 2015 dataframe\n", | |
" yellow1 = dask_cudf.read_csv(yellow_glob_pre_2015, \n", | |
" header=0,\n", | |
" na_values=[\"NA\"],\n", | |
" parse_dates=[1, 2],\n", | |
" infer_datetime_format=True,\n", | |
" dtype=dtype_list,\n", | |
" names=yellow_schema_pre_2015.split(',')\n", | |
" )\n", | |
" yellow1['dropoff_taxizone_id'] = -1.0\n", | |
" yellow1['pickup_taxizone_id'] = -1.0\n", | |
" yellow1['ehail_fee'] = np.nan\n", | |
" yellow1['improvement_surcharge'] = np.nan\n", | |
" yellow1['improvement_surcharge'] = yellow1['improvement_surcharge'].astype('float32')\n", | |
" yellow1['trip_type'] = -1.0\n", | |
" \n", | |
" # create january 2015 - june 2016 dataframe\n", | |
" yellow2 = dask_cudf.read_csv(yellow_glob_2015_2016_h1, \n", | |
" header=0,\n", | |
" na_values=[\"NA\"],\n", | |
" parse_dates=[1, 2],\n", | |
" infer_datetime_format=True,\n", | |
" dtype=dtype_list,\n", | |
" names=yellow_schema_2015_2016_h1.split(',')\n", | |
" )\n", | |
" yellow2['dropoff_taxizone_id'] = -1.0\n", | |
" yellow2['pickup_taxizone_id'] = -1.0\n", | |
" yellow2['ehail_fee'] = np.nan\n", | |
" yellow2['trip_type'] = -1.0\n", | |
"\n", | |
" # create post june 2016 dataframe\n", | |
" yellow3 = dask_cudf.read_csv(yellow_glob_2016_h2_plus, \n", | |
" header=0,\n", | |
" na_values=[\"NA\"],\n", | |
" parse_dates=[1, 2],\n", | |
" infer_datetime_format=True,\n", | |
" dtype=dtype_list,\n", | |
" names=yellow_schema_2016_h2_plus.split(',')\n", | |
" )\n", | |
" yellow3['dropoff_latitude'] = 0.0\n", | |
" yellow3['dropoff_longitude'] = 0.0\n", | |
" yellow3['pickup_latitude'] = 0.0\n", | |
" yellow3['pickup_longitude'] = 0.0\n", | |
" yellow3['ehail_fee'] = np.nan\n", | |
" yellow3['trip_type'] = -1.0\n", | |
" yellow3 = yellow3.drop(['junk1', 'junk2'], axis=1)\n", | |
"\n", | |
" yellow = dask_cudf.concat([yellow1[sorted(yellow1.columns)], \n", | |
" yellow2[sorted(yellow1.columns)], \n", | |
" yellow3[sorted(yellow1.columns)]]\n", | |
" )\n", | |
" for field in list(yellow.columns):\n", | |
" if field in dtype_list:\n", | |
" yellow[field] = yellow[field].astype(dtype_list[field])\n", | |
"\n", | |
" yellow['trip_type'] = 'yellow'\n", | |
"\n", | |
" return yellow" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def get_uber():\n", | |
" uber_schema_2014 = \"pickup_datetime,pickup_latitude,pickup_longitude,junk1\"\n", | |
" uber_glob_2014 = glob(os.path.join(config['uber_raw_data_path'], 'uber*-???14.csv'))\n", | |
"\n", | |
" uber1 = dask_cudf.read_csv(uber_glob_2014, \n", | |
" header=0,\n", | |
" na_values=[\"NA\"], \n", | |
" parse_dates=[0,],\n", | |
" infer_datetime_format = True,\n", | |
" dtype=dtype_list,\n", | |
" names=uber_schema_2014.split(',')\n", | |
" )\n", | |
" uber1 = uber1.drop(['junk1',], axis=1)\n", | |
" uber1 = uber1.assign(pickup_taxizone_id=-1.0)\n", | |
"\n", | |
" uber_schema_2015 = \"junk1,pickup_datetime,junk2,pickup_taxizone_id\"\n", | |
" uber_glob_2015 = glob(os.path.join(config['uber_raw_data_path'], 'uber*15.csv'))\n", | |
"\n", | |
" uber2 = dask_cudf.read_csv(uber_glob_2015, \n", | |
" header=0,\n", | |
" na_values=[\"NA\"], \n", | |
" parse_dates=[1,],\n", | |
" infer_datetime_format = True,\n", | |
" dtype=dtype_list,\n", | |
" names=uber_schema_2015.split(',')\n", | |
" )\n", | |
" uber2 = uber2.drop(['junk1', 'junk2'], axis=1)\n", | |
" uber2 = uber2.assign(pickup_latitude=0.0, pickup_longitude=0.0)\n", | |
"\n", | |
" uberdf = dask_cudf.concat([uber1[sorted(uber1.columns)], \n", | |
" uber2[sorted(uber1.columns)]]\n", | |
" )\n", | |
" for field in dtype_list:\n", | |
" if (field in uberdf.columns):\n", | |
" uberdf[field] = uberdf[field].astype(dtype_list[field])\n", | |
" elif field == 'pickup_datetime':\n", | |
" pass\n", | |
" else:\n", | |
" uberdf[field] = np.nan\n", | |
" uberdf[field] = uberdf[field].astype(dtype_list[field])\n", | |
"\n", | |
" uberdf = uberdf.drop(['junk1', 'junk2'], axis=1)\n", | |
"\n", | |
"# uberdf['dropoff_datetime'] = np.datetime64(\"1970-01-01 00:00:00\")\n", | |
"# #uberdf = uberdf.repartition(npartitions=20)\n", | |
"\n", | |
" uberdf['trip_type'] = 'uber'\n", | |
"\n", | |
" uberdf = uberdf[sorted(uberdf.columns)]\n", | |
"\n", | |
" return uberdf" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"green = get_green()\n", | |
"yellow = get_yellow()\n", | |
"uber = get_uber()\n", | |
"\n", | |
"all_trips = uber.append(green).append(yellow)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"pip_iterations = list(np.arange(0, 263, 31))\n", | |
"pip_iterations.append(263)\n", | |
"\n", | |
"taxi_zones = cuspatial.read_polygon_shapefile(f'{data_dir}{t_zones_sub_dir}cu_taxi_zones.shp')\n", | |
"\n", | |
"def assign_taxi_zones(df, lon_var, lat_var, locid_var):\n", | |
" \"\"\"\n", | |
" Derives Taxi Zones from shapefile.\n", | |
" \n", | |
" This function takes longitude values provided by `lon_var`, and latitude\n", | |
" values provided by `lat_var` in DataFrame `df`, and performs a spatial join\n", | |
" with the NYC taxi_zones shapefile. \n", | |
" \n", | |
" The shapefile is hard coded in, as this function makes a hard assumption of\n", | |
" latitude and longitude coordinates. It also assumes latitude=0.0 and \n", | |
" longitude=0.0 is not a datapoint that can exist in your dataset. Which is \n", | |
" reasonable for a dataset of New York, but a bit edgy for a global dataset.\n", | |
" \n", | |
" Only rows where `df.lon_var`, `df.lat_var` are reasonably near New York,\n", | |
" and `df.locid_var` is set to -1.0 are updated.\n", | |
" \n", | |
" Parameters\n", | |
" ----------\n", | |
" df : cudf.DataFrame or dask_cudf.DataFrame\n", | |
" DataFrame containing latitudes, longitudes, and location_id columns.\n", | |
" lon_var : string\n", | |
" Name of column in `df` containing longitude values. Invalid values \n", | |
" should be -1.0.\n", | |
" lat_var : string\n", | |
" Name of column in `df` containing latitude values. Invalid values \n", | |
" should be -1.0\n", | |
" locid_var : string\n", | |
" Name of column in `df` containing taxi_zone location ids. Rows with\n", | |
" valid, nonzero values are not overwritten.\n", | |
" \"\"\"\n", | |
" # focus location columns\n", | |
" localdf = df[[lon_var, lat_var, locid_var]].copy()\n", | |
" # localdf = localdf.reset_index()\n", | |
" \n", | |
" # fill missing lat/long values\n", | |
" localdf[lon_var] = localdf[lon_var].fillna(value=0.0)\n", | |
" localdf[lat_var] = localdf[lat_var].fillna(value=0.0)\n", | |
" \n", | |
" # (bool column) is location id missing && do we have lat/long coordinates?\n", | |
" localdf['replace_locid'] = ((localdf[locid_var] == -1.0)\n", | |
" & (localdf[lon_var] != 0.0)\n", | |
" & (localdf[lat_var] != 0.0)\n", | |
" )\n", | |
" \n", | |
" # are there any values to replace?\n", | |
" if (np.any(localdf['replace_locid'])): # makes ~28.469% faster\n", | |
" # go through zones 31 at a time\n", | |
" for i in range(len(pip_iterations)-1):\n", | |
" # tag 1st and last zone #s\n", | |
" start = pip_iterations[i]\n", | |
" end = pip_iterations[i+1]\n", | |
" # derive taxi zones from coordinates\n", | |
" t_zones = cuspatial.point_in_polygon(localdf[lon_var], \n", | |
" localdf[lat_var], \n", | |
" taxi_zones[0][start:end], \n", | |
" taxi_zones[1], \n", | |
" taxi_zones[2]['x'], \n", | |
" taxi_zones[2]['y'])\n", | |
" # insert taxi zones into location id columns \n", | |
" for j in t_zones.columns:\n", | |
" localdf[locid_var].loc[t_zones[j]] = j\n", | |
" \n", | |
" return localdf[locid_var].astype('float64') \n", | |
"\n", | |
" else:\n", | |
" localdf[locid_var] = localdf[locid_var].astype('float64') \n", | |
" return localdf[locid_var]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# derive & assign pickup & dropoff taxi zones \n", | |
"all_trips['dropoff_taxizone_id'] = all_trips.map_partitions(assign_taxi_zones, \n", | |
" lon_var='dropoff_longitude', \n", | |
" lat_var='dropoff_latitude',\n", | |
" locid_var='dropoff_taxizone_id', \n", | |
" meta=('dropoff_taxizone_id', np.float64))\n", | |
"all_trips['pickup_taxizone_id'] = all_trips.map_partitions(assign_taxi_zones, \n", | |
" lon_var='pickup_longitude', \n", | |
" lat_var='pickup_latitude',\n", | |
" locid_var='pickup_taxizone_id', \n", | |
" meta=('pickup_taxizone_id', np.float64))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"all_trips = all_trips[sorted(all_trips.columns)]\n", | |
"# all_trips = all_trips.repartition(npartitions=1200)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"all_trips = all_trips.map_partitions(lambda x: x.sort_values('pickup_datetime'), \n", | |
" meta=all_trips)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"for fieldName in all_trips.columns:\n", | |
" if fieldName in dtype_list:\n", | |
" all_trips[fieldName] = all_trips[fieldName].astype(dtype_list[fieldName])" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"all_trips.columns" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from dask_cuda import LocalCUDACluster\n", | |
"from dask.distributed import Client\n", | |
"\n", | |
"cluster = LocalCUDACluster()\n", | |
"client = Client(cluster)\n", | |
"\n", | |
"client" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from blazingsql import BlazingContext\n", | |
"\n", | |
"bc = BlazingContext(dask_client=client, network_interface='lo')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"%%time\n", | |
"bc.create_table('full_taxi', all_trips)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# %%time\n", | |
"# q = bc.sql('SELECT COUNT(*) FROM full_taxi')\n", | |
"\n", | |
"# type(q)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# %%time\n", | |
"# q.compute()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"%%time\n", | |
"query = '''\n", | |
" SELECT\n", | |
" *\n", | |
" FROM\n", | |
" full_taxi\n", | |
" WHERE\n", | |
" dropoff_taxizone_id IS NOT NULL \n", | |
" AND pickup_taxizone_id IS NOT NULL \n", | |
" AND dropoff_taxizone_id <> -1.0 \n", | |
" AND pickup_taxizone_id <> -1.0\n", | |
" ORDER BY\n", | |
" pickup_datetime\n", | |
" '''\n", | |
"df = bc.sql(query)\n", | |
"\n", | |
"type(df), len(df)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"%%time\n", | |
"df.compute().to_csv('full_taxi.csv', index=False)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"%%time\n", | |
"df.compute()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"np.sum(df.tip_amount.isna().compute())" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"bc.sql('select * from full_taxi')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"cols = 'pickup_taxizone_id, dropoff_taxizone_id, trip_distance, passenger_count, fare_amount, total_amount - fare_amount as tax, tip_amount'\n", | |
"bc.sql(f'select {cols} from full_taxi where pickup_taxizone_id = 200 and year(cast(pickup_datetime as timestamp)) between 2014 and 2018')" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "RAPIDS Stable", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.8.5" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment