Skip to content

Instantly share code, notes, and snippets.

@gumdropsteve
Created December 3, 2020 09:19
Show Gist options
  • Save gumdropsteve/decc503c4eeea9c4ac363f5b95457045 to your computer and use it in GitHub Desktop.
Save gumdropsteve/decc503c4eeea9c4ac363f5b95457045 to your computer and use it in GitHub Desktop.
run this notebook on https://app.blazingsql.com
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### **Download Data**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import urllib"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# tag data dir & sub dirs\n",
"data_dir = 'raw_data/'\n",
"taxi_sub_dir = 'taxi/'\n",
"uber_sub_dir = 'uber/'\n",
"t_zones_sub_dir = 'zones/'\n",
"\n",
"# create directories if they don't exist\n",
"for d in [data_dir, f'{data_dir}/{taxi_sub_dir}', f'{data_dir}/{uber_sub_dir}', f'{data_dir}/{t_zones_sub_dir}']:\n",
" if not os.path.exists(d):\n",
" os.system(f'mkdir {d}')\n",
" \n",
"# (raw) url to directory we're downloading\n",
"base_url = 'https://raw.githubusercontent.com/gumdropsteve/datasets/master/nyc_transport/'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# download green taxi\n",
"for file in [f'green_tripdata_{f}.csv' for f in ['2013-08', '2014-01', '2015-01', '2016-01', '2017-01', '2018-01', '2019-01']]:\n",
" if not os.path.isfile(data_dir + taxi_sub_dir + file):\n",
" print(f'Downloading {base_url + taxi_sub_dir + file} to {data_dir + taxi_sub_dir + file}')\n",
" urllib.request.urlretrieve(base_url + taxi_sub_dir + file, data_dir + taxi_sub_dir + file)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# download yellow taxi\n",
"for file in [f'yellow_tripdata_20{f}-01.csv' for f in ['09', 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]:\n",
" if not os.path.isfile(data_dir + taxi_sub_dir + file):\n",
" print(f'Downloading {base_url + taxi_sub_dir + file} to {data_dir + taxi_sub_dir + file}')\n",
" urllib.request.urlretrieve(base_url + taxi_sub_dir + file, data_dir + taxi_sub_dir + file)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# download uber taxi (2014)\n",
"for file in [f'uber-raw-data-{month}14.csv' for month in ['apr', 'aug', 'jul', 'jun', 'may', 'sep']]:\n",
" if not os.path.isfile(data_dir + uber_sub_dir + file):\n",
" print(f'Downloading {base_url + uber_sub_dir + file} to {data_dir + uber_sub_dir + file}')\n",
" urllib.request.urlretrieve(base_url + uber_sub_dir + file, data_dir + uber_sub_dir + file)\n",
" \n",
"# download uber taxi (2015)\n",
"file = 'uber-raw-data-janjune-15.csv'\n",
"if not os.path.isfile(data_dir + uber_sub_dir + file):\n",
" print(f'Downloading {base_url + uber_sub_dir + file} to {data_dir + uber_sub_dir + file}')\n",
" urllib.request.urlretrieve(base_url + uber_sub_dir + file, data_dir + uber_sub_dir + file)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# download taxi zones shapefile(s)\n",
"for file in [f'cu_taxi_zones.{f}' for f in ['cpg', 'dbf', 'prj', 'shp', 'shx']]:\n",
" if not os.path.isfile(data_dir + t_zones_sub_dir + file):\n",
" print(uber_sub_dir + data_dir + file)\n",
" print(f'Downloading {base_url + t_zones_sub_dir + file} to {data_dir + t_zones_sub_dir + file}')\n",
" urllib.request.urlretrieve(base_url + t_zones_sub_dir + file, data_dir + t_zones_sub_dir + file)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Convert Taxi to Parquet"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import dask_cudf\n",
"import cuspatial\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dtype_list = {'dropoff_datetime': 'str', \n",
" 'dropoff_latitude': 'float64',\n",
" 'dropoff_taxizone_id': 'float64',\n",
" 'dropoff_longitude': 'float64',\n",
" 'ehail_fee': 'float64',\n",
" 'extra': 'float64',\n",
" 'fare_amount': 'float64',\n",
" 'improvement_surcharge': 'float64',\n",
" 'junk1': 'str',\n",
" 'junk2': 'str',\n",
" 'mta_tax': 'float64',\n",
" 'passenger_count': 'str', \n",
" 'payment_type': 'str', \n",
" 'pickup_datetime': 'str', \n",
" 'pickup_latitude': 'float64',\n",
" 'pickup_taxizone_id': 'float64',\n",
" 'pickup_longitude': 'float64',\n",
" 'rate_code_id': 'str', \n",
" 'store_and_fwd_flag': 'str', \n",
" 'tip_amount': 'float64',\n",
" 'tolls_amount': 'float64',\n",
" 'total_amount': 'float64',\n",
" 'trip_distance': 'float64',\n",
" 'trip_type': 'str', \n",
" 'vendor_id': 'str', \n",
" }\n",
"\n",
"# make dict of paths to data directories\n",
"relative_path = 'raw_data'\n",
"config = {'citibike_raw_data_path': f'{relative_path}/bike/',\n",
" 'taxi_raw_data_path': f'{relative_path}/taxi/',\n",
" 'uber_raw_data_path': f'{relative_path}/uber/',\n",
" 'subway_raw_data_path': f'{relative_path}/subway/',\n",
" 'parquet_output_path': f'data/'\n",
" }\n",
"\n",
"def glob(x):\n",
" '''\n",
" Signature: sorted(glob(pathname=x, *, recursive=False))\n",
" Docstring:\n",
" Return a list of paths matching a pathname pattern.\n",
"\n",
" The pattern may contain simple shell-style wildcards a la\n",
" fnmatch. However, unlike fnmatch, filenames starting with a\n",
" dot are special cases that are not matched by '*' and '?'\n",
" patterns.\n",
"\n",
" If recursive is true, the pattern '**' will match any files and\n",
" zero or more directories and subdirectories.\n",
" '''\n",
" from glob import glob\n",
" return sorted(glob(x))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_green():\n",
" green_schema_pre_2015 = \"vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,total_amount,payment_type,trip_type,junk1,junk2\"\n",
" green_glob_pre_2015 = glob(\n",
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_201[34]*.csv'))\n",
"\n",
" green_schema_2015_h1 = \"vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,junk1,junk2\"\n",
" green_glob_2015_h1 = glob(\n",
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2015-0[1-6].csv'))\n",
"\n",
" green_schema_2015_h2_2016_h1 = \"vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type\"\n",
" green_glob_2015_h2_2016_h1 = glob(os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2015-0[7-9].csv')) + glob(\n",
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2015-1[0-2].csv')) + glob(\n",
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2016-0[1-6].csv'))\n",
" \n",
" green_schema_2016_h2_plus = \"vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_taxizone_id,dropoff_taxizone_id,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,junk1,junk2\"\n",
" green_glob_2016_h2_plus = glob(os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2016-0[7-9].csv')) + glob(\n",
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2016-1[0-2].csv')) + glob(\n",
" os.path.join(config['taxi_raw_data_path'], 'green_tripdata_201[7-9]*.csv'))\n",
"\n",
" # before 2015 dataframe\n",
" green1 = dask_cudf.read_csv(green_glob_pre_2015, \n",
" header=0,\n",
" na_values=[\"NA\"],\n",
" parse_dates=[1, 2],\n",
" infer_datetime_format=True,\n",
" dtype=dtype_list,\n",
" names=green_schema_pre_2015.split(','))\n",
" green1['dropoff_taxizone_id'] = -1.0\n",
" green1['pickup_taxizone_id'] = -1.0\n",
" green1['improvement_surcharge'] = np.nan\n",
" green1 = green1.drop(['junk1', 'junk2'], axis=1)\n",
"\n",
" # january 2015 - june 2015 dataframe\n",
" green2 = dask_cudf.read_csv(green_glob_2015_h1, \n",
" header=0,\n",
" na_values=[\"NA\"],\n",
" parse_dates=[1, 2],\n",
" infer_datetime_format=True,\n",
" dtype=dtype_list,\n",
" names=green_schema_2015_h1.split(','))\n",
" green2['dropoff_taxizone_id'] = -1.0\n",
" green2['pickup_taxizone_id'] = -1.0\n",
" green2 = green2.drop(['junk1', 'junk2'], axis=1)\n",
"\n",
" # july 2015 - june 2016 dataframe\n",
" green3 = dask_cudf.read_csv(green_glob_2015_h2_2016_h1, \n",
" header=0,\n",
" na_values=[\"NA\"],\n",
" parse_dates=[1, 2],\n",
" infer_datetime_format=True,\n",
" dtype=dtype_list,\n",
" names=green_schema_2015_h2_2016_h1.split(','))\n",
" green3['dropoff_taxizone_id'] = -1.0\n",
" green3['pickup_taxizone_id'] = -1.0\n",
"\n",
" # july 2016 or later dataframe\n",
" green4 = dask_cudf.read_csv(green_glob_2016_h2_plus, \n",
" header=0,\n",
" na_values=[\"NA\"],\n",
" parse_dates=[1, 2],\n",
" infer_datetime_format=True,\n",
" dtype=dtype_list,\n",
" names=green_schema_2016_h2_plus.split(','))\n",
" green4['dropoff_latitude'] = 0.0\n",
" green4['dropoff_longitude'] = 0.0\n",
" green4['pickup_latitude'] = 0.0\n",
" green4['pickup_longitude'] = 0.0\n",
" green4 = green4.drop(['junk1', 'junk2'], axis=1)\n",
"\n",
" # combine dataframes\n",
" green = dask_cudf.concat([green1[sorted(green1.columns)],\n",
" green2[sorted(green1.columns)],\n",
" green3[sorted(green1.columns)],\n",
" green4[sorted(green1.columns)]]\n",
" )\n",
" for field in list(green.columns):\n",
" if field in dtype_list:\n",
" green[field] = green[field].astype(dtype_list[field])\n",
"\n",
" green['trip_type'] = 'green'\n",
"\n",
" return green"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_yellow():\n",
" # tag file paths to data and column names by schema (x < 2015, 2015 <= x <= 2016.5, 2016.5 < x)\n",
" yellow_schema_pre_2015 = \"vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,total_amount\"\n",
" yellow_glob_pre_2015 = glob(os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_201[0-4]*.csv')) + glob(\n",
" os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2009*.csv'))\n",
" yellow_schema_2015_2016_h1 = \"vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount\"\n",
" yellow_glob_2015_2016_h1 = glob(os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2015*.csv')) + glob(\n",
" os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2016-0[1-6].csv'))\n",
" yellow_schema_2016_h2_plus = \"vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_taxizone_id,dropoff_taxizone_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,junk1,junk2\"\n",
" yellow_glob_2016_h2_plus = glob(os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2016-0[7-9].csv')) + glob(\n",
" os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2016-1[0-2].csv')) + glob(\n",
" os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_201[7-9]*.csv'))\n",
"\n",
" # create pre 2015 dataframe\n",
" yellow1 = dask_cudf.read_csv(yellow_glob_pre_2015, \n",
" header=0,\n",
" na_values=[\"NA\"],\n",
" parse_dates=[1, 2],\n",
" infer_datetime_format=True,\n",
" dtype=dtype_list,\n",
" names=yellow_schema_pre_2015.split(',')\n",
" )\n",
" yellow1['dropoff_taxizone_id'] = -1.0\n",
" yellow1['pickup_taxizone_id'] = -1.0\n",
" yellow1['ehail_fee'] = np.nan\n",
" yellow1['improvement_surcharge'] = np.nan\n",
" yellow1['improvement_surcharge'] = yellow1['improvement_surcharge'].astype('float32')\n",
" yellow1['trip_type'] = -1.0\n",
" \n",
" # create january 2015 - june 2016 dataframe\n",
" yellow2 = dask_cudf.read_csv(yellow_glob_2015_2016_h1, \n",
" header=0,\n",
" na_values=[\"NA\"],\n",
" parse_dates=[1, 2],\n",
" infer_datetime_format=True,\n",
" dtype=dtype_list,\n",
" names=yellow_schema_2015_2016_h1.split(',')\n",
" )\n",
" yellow2['dropoff_taxizone_id'] = -1.0\n",
" yellow2['pickup_taxizone_id'] = -1.0\n",
" yellow2['ehail_fee'] = np.nan\n",
" yellow2['trip_type'] = -1.0\n",
"\n",
" # create post june 2016 dataframe\n",
" yellow3 = dask_cudf.read_csv(yellow_glob_2016_h2_plus, \n",
" header=0,\n",
" na_values=[\"NA\"],\n",
" parse_dates=[1, 2],\n",
" infer_datetime_format=True,\n",
" dtype=dtype_list,\n",
" names=yellow_schema_2016_h2_plus.split(',')\n",
" )\n",
" yellow3['dropoff_latitude'] = 0.0\n",
" yellow3['dropoff_longitude'] = 0.0\n",
" yellow3['pickup_latitude'] = 0.0\n",
" yellow3['pickup_longitude'] = 0.0\n",
" yellow3['ehail_fee'] = np.nan\n",
" yellow3['trip_type'] = -1.0\n",
" yellow3 = yellow3.drop(['junk1', 'junk2'], axis=1)\n",
"\n",
" yellow = dask_cudf.concat([yellow1[sorted(yellow1.columns)], \n",
" yellow2[sorted(yellow1.columns)], \n",
" yellow3[sorted(yellow1.columns)]]\n",
" )\n",
" for field in list(yellow.columns):\n",
" if field in dtype_list:\n",
" yellow[field] = yellow[field].astype(dtype_list[field])\n",
"\n",
" yellow['trip_type'] = 'yellow'\n",
"\n",
" return yellow"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_uber():\n",
" uber_schema_2014 = \"pickup_datetime,pickup_latitude,pickup_longitude,junk1\"\n",
" uber_glob_2014 = glob(os.path.join(config['uber_raw_data_path'], 'uber*-???14.csv'))\n",
"\n",
" uber1 = dask_cudf.read_csv(uber_glob_2014, \n",
" header=0,\n",
" na_values=[\"NA\"], \n",
" parse_dates=[0,],\n",
" infer_datetime_format = True,\n",
" dtype=dtype_list,\n",
" names=uber_schema_2014.split(',')\n",
" )\n",
" uber1 = uber1.drop(['junk1',], axis=1)\n",
" uber1 = uber1.assign(pickup_taxizone_id=-1.0)\n",
"\n",
" uber_schema_2015 = \"junk1,pickup_datetime,junk2,pickup_taxizone_id\"\n",
" uber_glob_2015 = glob(os.path.join(config['uber_raw_data_path'], 'uber*15.csv'))\n",
"\n",
" uber2 = dask_cudf.read_csv(uber_glob_2015, \n",
" header=0,\n",
" na_values=[\"NA\"], \n",
" parse_dates=[1,],\n",
" infer_datetime_format = True,\n",
" dtype=dtype_list,\n",
" names=uber_schema_2015.split(',')\n",
" )\n",
" uber2 = uber2.drop(['junk1', 'junk2'], axis=1)\n",
" uber2 = uber2.assign(pickup_latitude=0.0, pickup_longitude=0.0)\n",
"\n",
" uberdf = dask_cudf.concat([uber1[sorted(uber1.columns)], \n",
" uber2[sorted(uber1.columns)]]\n",
" )\n",
" for field in dtype_list:\n",
" if (field in uberdf.columns):\n",
" uberdf[field] = uberdf[field].astype(dtype_list[field])\n",
" elif field == 'pickup_datetime':\n",
" pass\n",
" else:\n",
" uberdf[field] = np.nan\n",
" uberdf[field] = uberdf[field].astype(dtype_list[field])\n",
"\n",
" uberdf = uberdf.drop(['junk1', 'junk2'], axis=1)\n",
"\n",
"# uberdf['dropoff_datetime'] = np.datetime64(\"1970-01-01 00:00:00\")\n",
"# #uberdf = uberdf.repartition(npartitions=20)\n",
"\n",
" uberdf['trip_type'] = 'uber'\n",
"\n",
" uberdf = uberdf[sorted(uberdf.columns)]\n",
"\n",
" return uberdf"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"green = get_green()\n",
"yellow = get_yellow()\n",
"uber = get_uber()\n",
"\n",
"all_trips = uber.append(green).append(yellow)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pip_iterations = list(np.arange(0, 263, 31))\n",
"pip_iterations.append(263)\n",
"\n",
"taxi_zones = cuspatial.read_polygon_shapefile(f'{data_dir}{t_zones_sub_dir}cu_taxi_zones.shp')\n",
"\n",
"def assign_taxi_zones(df, lon_var, lat_var, locid_var):\n",
" \"\"\"\n",
" Derives Taxi Zones from shapefile.\n",
" \n",
" This function takes longitude values provided by `lon_var`, and latitude\n",
" values provided by `lat_var` in DataFrame `df`, and performs a spatial join\n",
" with the NYC taxi_zones shapefile. \n",
" \n",
" The shapefile is hard coded in, as this function makes a hard assumption of\n",
" latitude and longitude coordinates. It also assumes latitude=0.0 and \n",
" longitude=0.0 is not a datapoint that can exist in your dataset. Which is \n",
" reasonable for a dataset of New York, but a bit edgy for a global dataset.\n",
" \n",
" Only rows where `df.lon_var`, `df.lat_var` are reasonably near New York,\n",
" and `df.locid_var` is set to -1.0 are updated.\n",
" \n",
" Parameters\n",
" ----------\n",
" df : cudf.DataFrame or dask_cudf.DataFrame\n",
" DataFrame containing latitudes, longitudes, and location_id columns.\n",
" lon_var : string\n",
" Name of column in `df` containing longitude values. Invalid values \n",
" should be -1.0.\n",
" lat_var : string\n",
" Name of column in `df` containing latitude values. Invalid values \n",
" should be -1.0\n",
" locid_var : string\n",
" Name of column in `df` containing taxi_zone location ids. Rows with\n",
" valid, nonzero values are not overwritten.\n",
" \"\"\"\n",
" # focus location columns\n",
" localdf = df[[lon_var, lat_var, locid_var]].copy()\n",
" # localdf = localdf.reset_index()\n",
" \n",
" # fill missing lat/long values\n",
" localdf[lon_var] = localdf[lon_var].fillna(value=0.0)\n",
" localdf[lat_var] = localdf[lat_var].fillna(value=0.0)\n",
" \n",
" # (bool column) is location id missing && do we have lat/long coordinates?\n",
" localdf['replace_locid'] = ((localdf[locid_var] == -1.0)\n",
" & (localdf[lon_var] != 0.0)\n",
" & (localdf[lat_var] != 0.0)\n",
" )\n",
" \n",
" # are there any values to replace?\n",
" if (np.any(localdf['replace_locid'])): # makes ~28.469% faster\n",
" # go through zones 31 at a time\n",
" for i in range(len(pip_iterations)-1):\n",
" # tag 1st and last zone #s\n",
" start = pip_iterations[i]\n",
" end = pip_iterations[i+1]\n",
" # derive taxi zones from coordinates\n",
" t_zones = cuspatial.point_in_polygon(localdf[lon_var], \n",
" localdf[lat_var], \n",
" taxi_zones[0][start:end], \n",
" taxi_zones[1], \n",
" taxi_zones[2]['x'], \n",
" taxi_zones[2]['y'])\n",
" # insert taxi zones into location id columns \n",
" for j in t_zones.columns:\n",
" localdf[locid_var].loc[t_zones[j]] = j\n",
" \n",
" return localdf[locid_var].astype('float64') \n",
"\n",
" else:\n",
" localdf[locid_var] = localdf[locid_var].astype('float64') \n",
" return localdf[locid_var]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# derive & assign pickup & dropoff taxi zones \n",
"all_trips['dropoff_taxizone_id'] = all_trips.map_partitions(assign_taxi_zones, \n",
" lon_var='dropoff_longitude', \n",
" lat_var='dropoff_latitude',\n",
" locid_var='dropoff_taxizone_id', \n",
" meta=('dropoff_taxizone_id', np.float64))\n",
"all_trips['pickup_taxizone_id'] = all_trips.map_partitions(assign_taxi_zones, \n",
" lon_var='pickup_longitude', \n",
" lat_var='pickup_latitude',\n",
" locid_var='pickup_taxizone_id', \n",
" meta=('pickup_taxizone_id', np.float64))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"all_trips = all_trips[sorted(all_trips.columns)]\n",
"# all_trips = all_trips.repartition(npartitions=1200)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"all_trips = all_trips.map_partitions(lambda x: x.sort_values('pickup_datetime'), \n",
" meta=all_trips)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for fieldName in all_trips.columns:\n",
" if fieldName in dtype_list:\n",
" all_trips[fieldName] = all_trips[fieldName].astype(dtype_list[fieldName])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"all_trips.columns"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask_cuda import LocalCUDACluster\n",
"from dask.distributed import Client\n",
"\n",
"cluster = LocalCUDACluster()\n",
"client = Client(cluster)\n",
"\n",
"client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from blazingsql import BlazingContext\n",
"\n",
"bc = BlazingContext(dask_client=client, network_interface='lo')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"bc.create_table('full_taxi', all_trips)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# %%time\n",
"# q = bc.sql('SELECT COUNT(*) FROM full_taxi')\n",
"\n",
"# type(q)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# %%time\n",
"# q.compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"query = '''\n",
" SELECT\n",
" *\n",
" FROM\n",
" full_taxi\n",
" WHERE\n",
" dropoff_taxizone_id IS NOT NULL \n",
" AND pickup_taxizone_id IS NOT NULL \n",
" AND dropoff_taxizone_id <> -1.0 \n",
" AND pickup_taxizone_id <> -1.0\n",
" ORDER BY\n",
" pickup_datetime\n",
" '''\n",
"df = bc.sql(query)\n",
"\n",
"type(df), len(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"df.compute().to_csv('full_taxi.csv', index=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"df.compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"np.sum(df.tip_amount.isna().compute())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"bc.sql('select * from full_taxi')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cols = 'pickup_taxizone_id, dropoff_taxizone_id, trip_distance, passenger_count, fare_amount, total_amount - fare_amount as tax, tip_amount'\n",
"bc.sql(f'select {cols} from full_taxi where pickup_taxizone_id = 200 and year(cast(pickup_datetime as timestamp)) between 2014 and 2018')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "RAPIDS Stable",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment