Skip to content

Instantly share code, notes, and snippets.

@VibhuJawa
Last active March 28, 2019 01:50
Show Gist options
  • Select an option

  • Save VibhuJawa/7947f1d87016c144cb4bc6f788f6ab21 to your computer and use it in GitHub Desktop.

Select an option

Save VibhuJawa/7947f1d87016c144cb4bc6f788f6ab21 to your computer and use it in GitHub Desktop.
This gists tests join on dask_cudf frames using weather data
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Download Required Data"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import urllib.request\n",
"\n",
"station_meta_url = 'https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt'\n",
"base_url = 'ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/'\n",
"data_dir = './'\n",
"\n",
"# download station metadata\n",
"if not os.path.isfile(data_dir+'ghcnd-stations.txt'):\n",
" print('Downloading station meta..')\n",
" urllib.request.urlretrieve(station_meta_url, data_dir+'ghcnd-stations.txt')\n",
"\n",
"# download station observations\n",
"years = list(range(2017, 2020))\n",
"for year in years:\n",
" fn = str(year) + '.csv.gz'\n",
" if not os.path.isfile(data_dir+fn):\n",
" print(f'Downloading {base_url+fn} to {data_dir+fn}')\n",
" urllib.request.urlretrieve(base_url+fn, data_dir+fn)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Start Dask_cudf client"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/conda/envs/rapids/lib/python3.6/site-packages/dask/config.py:168: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.\n",
" data = yaml.load(f.read()) or {}\n",
"/conda/envs/rapids/lib/python3.6/site-packages/distributed/config.py:20: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.\n",
" defaults = yaml.load(f)\n"
]
},
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Client</h3>\n",
"<ul>\n",
" <li><b>Scheduler: </b>tcp://172.17.0.2:44122\n",
" <li><b>Dashboard: </b><a href='http://172.17.0.2:9800/status' target='_blank'>http://172.17.0.2:9800/status</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Cluster</h3>\n",
"<ul>\n",
" <li><b>Workers: </b>7</li>\n",
" <li><b>Cores: </b>7</li>\n",
" <li><b>Memory: </b>540.95 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: scheduler='tcp://172.17.0.2:44122' processes=7 cores=7>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Client, wait\n",
"from dask_cuda import LocalCUDACluster\n",
"import dask\n",
"import os\n",
"\n",
"import cudf, dask_cudf\n",
"\n",
"# Use dask-cuda to start one worker per GPU on a single-node system\n",
"# When you shutdown this notebook kernel, the Dask cluster also shuts down.\n",
"cluster = LocalCUDACluster(ip='0.0.0.0',diagnostics_port=9800)\n",
"client = Client(cluster)\n",
"# print client info\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Load Wether Dataframe "
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/conda/envs/rapids/lib/python3.6/site-packages/dask_cudf-0.0.0.dev0-py3.6.egg/dask_cudf/io/csv.py:44: UserWarning: Warning gzip compression does not support breaking apart files\n",
"Please ensure that each individual file can fit in memory and\n",
"use the keyword ``chunksize=None to remove this message``\n",
"Setting ``chunksize=(size of file)``\n",
" \"Setting ``chunksize=(size of file)``\" % compression\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Weather single part len 34,591,359\n",
"Weather Multipart len 34,591,359\n"
]
}
],
"source": [
"names = [\"id\", \"date\", \"type\", \"val\"]\n",
"usecols = names[0:4]\n",
"weather_df_single_part = dask_cudf.read_csv('2017.csv.gz', names=names,compression='gzip')\n",
"print(\"Weather single part len {:,}\".format(len(weather_df_single_part)))\n",
"\n",
"weather_df_multi_part = weather_df_single_part.repartition(npartitions=2)\n",
"weather_df_multi_part = weather_df_multi_part.persist()\n",
"print(\"Weather Multipart len {:,}\".format(len(weather_df_multi_part)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Load Stations data_frame"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"import nvstrings \n",
"\n",
"lines = nvstrings.from_csv('ghcnd-stations.txt', 0)\n",
"\n",
"# Create a new dataframe to hold each column\n",
"station_df = cudf.DataFrame()\n",
"\n",
"# Note that you can chain nvstrings operators together\n",
"station_df['id'] = lines.slice(0, 11).strip()\n",
"station_df['latitude'] = lines.slice(12, 20).strip().stof()\n",
"station_df['longitude'] = lines.slice(21, 30).strip().stof()\n",
"\n",
"station_df.head().to_pandas()\n",
"\n",
"station_df = dask_cudf.from_cudf(station_df,npartitions=1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Merge"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Length of merged df 34,591,359 single part\n",
"Length of merged df 17,202,761 multi_part\n"
]
}
],
"source": [
"joined_dask_singlepart = station_df.merge(weather_df_single_part, on=['id'], how='inner')\n",
"print(\"Length of merged df {:,} single part\".format(len(joined_dask_singlepart)))\n",
"\n",
"joined_dask_multipart = station_df.merge(weather_df_multi_part, on=['id'], how='inner')\n",
"print(\"Length of merged df {:,} multi_part\".format(len(joined_dask_multipart)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Merge Visualizations"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# joined_dask_multipart.visualize('multipart_merge.svg')"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"# joined_df = dask.dataframe.merge(station_df,weather_df_multi_part,on=['id'], how='inner')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment