Last active
March 28, 2019 01:50
-
-
Save VibhuJawa/7947f1d87016c144cb4bc6f788f6ab21 to your computer and use it in GitHub Desktop.
This gists tests join on dask_cudf frames using weather data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "# Download Required Data" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 1, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import os\n", | |
| "import urllib.request\n", | |
| "\n", | |
| "station_meta_url = 'https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt'\n", | |
| "base_url = 'ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/'\n", | |
| "data_dir = './'\n", | |
| "\n", | |
| "# download station metadata\n", | |
| "if not os.path.isfile(data_dir+'ghcnd-stations.txt'):\n", | |
| " print('Downloading station meta..')\n", | |
| " urllib.request.urlretrieve(station_meta_url, data_dir+'ghcnd-stations.txt')\n", | |
| "\n", | |
| "# download station observations\n", | |
| "years = list(range(2017, 2020))\n", | |
| "for year in years:\n", | |
| " fn = str(year) + '.csv.gz'\n", | |
| " if not os.path.isfile(data_dir+fn):\n", | |
| " print(f'Downloading {base_url+fn} to {data_dir+fn}')\n", | |
| " urllib.request.urlretrieve(base_url+fn, data_dir+fn)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "# Start Dask_cudf client" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 2, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| "/conda/envs/rapids/lib/python3.6/site-packages/dask/config.py:168: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.\n", | |
| " data = yaml.load(f.read()) or {}\n", | |
| "/conda/envs/rapids/lib/python3.6/site-packages/distributed/config.py:20: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.\n", | |
| " defaults = yaml.load(f)\n" | |
| ] | |
| }, | |
| { | |
| "data": { | |
| "text/html": [ | |
| "<table style=\"border: 2px solid white;\">\n", | |
| "<tr>\n", | |
| "<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
| "<h3>Client</h3>\n", | |
| "<ul>\n", | |
| " <li><b>Scheduler: </b>tcp://172.17.0.2:44122\n", | |
| " <li><b>Dashboard: </b><a href='http://172.17.0.2:9800/status' target='_blank'>http://172.17.0.2:9800/status</a>\n", | |
| "</ul>\n", | |
| "</td>\n", | |
| "<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
| "<h3>Cluster</h3>\n", | |
| "<ul>\n", | |
| " <li><b>Workers: </b>7</li>\n", | |
| " <li><b>Cores: </b>7</li>\n", | |
| " <li><b>Memory: </b>540.95 GB</li>\n", | |
| "</ul>\n", | |
| "</td>\n", | |
| "</tr>\n", | |
| "</table>" | |
| ], | |
| "text/plain": [ | |
| "<Client: scheduler='tcp://172.17.0.2:44122' processes=7 cores=7>" | |
| ] | |
| }, | |
| "execution_count": 2, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "from dask.distributed import Client, wait\n", | |
| "from dask_cuda import LocalCUDACluster\n", | |
| "import dask\n", | |
| "import os\n", | |
| "\n", | |
| "import cudf, dask_cudf\n", | |
| "\n", | |
| "# Use dask-cuda to start one worker per GPU on a single-node system\n", | |
| "# When you shutdown this notebook kernel, the Dask cluster also shuts down.\n", | |
| "cluster = LocalCUDACluster(ip='0.0.0.0',diagnostics_port=9800)\n", | |
| "client = Client(cluster)\n", | |
| "# print client info\n", | |
| "client" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "# Load Wether Dataframe " | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 3, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| "/conda/envs/rapids/lib/python3.6/site-packages/dask_cudf-0.0.0.dev0-py3.6.egg/dask_cudf/io/csv.py:44: UserWarning: Warning gzip compression does not support breaking apart files\n", | |
| "Please ensure that each individual file can fit in memory and\n", | |
| "use the keyword ``chunksize=None to remove this message``\n", | |
| "Setting ``chunksize=(size of file)``\n", | |
| " \"Setting ``chunksize=(size of file)``\" % compression\n" | |
| ] | |
| }, | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "Weather single part len 34,591,359\n", | |
| "Weather Multipart len 34,591,359\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "names = [\"id\", \"date\", \"type\", \"val\"]\n", | |
| "usecols = names[0:4]\n", | |
| "weather_df_single_part = dask_cudf.read_csv('2017.csv.gz', names=names,compression='gzip')\n", | |
| "print(\"Weather single part len {:,}\".format(len(weather_df_single_part)))\n", | |
| "\n", | |
| "weather_df_multi_part = weather_df_single_part.repartition(npartitions=2)\n", | |
| "weather_df_multi_part = weather_df_multi_part.persist()\n", | |
| "print(\"Weather Multipart len {:,}\".format(len(weather_df_multi_part)))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "# Load Stations data_frame" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 5, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import nvstrings \n", | |
| "\n", | |
| "lines = nvstrings.from_csv('ghcnd-stations.txt', 0)\n", | |
| "\n", | |
| "# Create a new dataframe to hold each column\n", | |
| "station_df = cudf.DataFrame()\n", | |
| "\n", | |
| "# Note that you can chain nvstrings operators together\n", | |
| "station_df['id'] = lines.slice(0, 11).strip()\n", | |
| "station_df['latitude'] = lines.slice(12, 20).strip().stof()\n", | |
| "station_df['longitude'] = lines.slice(21, 30).strip().stof()\n", | |
| "\n", | |
| "station_df.head().to_pandas()\n", | |
| "\n", | |
| "station_df = dask_cudf.from_cudf(station_df,npartitions=1)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "# Merge" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 7, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "Length of merged df 34,591,359 single part\n", | |
| "Length of merged df 17,202,761 multi_part\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "joined_dask_singlepart = station_df.merge(weather_df_single_part, on=['id'], how='inner')\n", | |
| "print(\"Length of merged df {:,} single part\".format(len(joined_dask_singlepart)))\n", | |
| "\n", | |
| "joined_dask_multipart = station_df.merge(weather_df_multi_part, on=['id'], how='inner')\n", | |
| "print(\"Length of merged df {:,} multi_part\".format(len(joined_dask_multipart)))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "# Merge Visualizations" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 4, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# joined_dask_multipart.visualize('multipart_merge.svg')" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 5, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# joined_df = dask.dataframe.merge(station_df,weather_df_multi_part,on=['id'], how='inner')" | |
| ] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 3", | |
| "language": "python", | |
| "name": "python3" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.6.8" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 2 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment