Created
October 27, 2020 15:16
-
-
Save jorisvandenbossche/88fbae6c330da9a9f26394a95a82c708 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "# Demo of Arrow Datasets (using pyarrow / pandas / dask)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 1, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import pandas as pd\n", | |
| "import pyarrow as pa\n", | |
| "import pyarrow.dataset as ds" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "We have a part of the NYC taxi data (2.5 years: July 2016 - 2018; 6GB compressed Parquet files, 24GB in uncompressed Arrow IPC)." | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Filtering a dataset with pyarrow.datasets" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "Opening the dataset:" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 2, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "dataset = ds.dataset(\"nyc-taxi-data/dask-partitioned/\", format=\"parquet\", partitioning=\"hive\")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "Getting a subset of the data (column projection + filter on the rows). In this case, the result is small enough that it fits easily into memory, so we can scan it all at once and convert it to a pyarrow Table using `to_table()`:" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 3, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "table = dataset.to_table(\n", | |
| " columns=[\"tip_amount\", \"total_amount\", \"passenger_count\"],\n", | |
| " filter=(ds.field(\"year\") == 2017) & (ds.field(\"total_amount\") > 100)\n", | |
| ")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "To do further computations, we can convert it to a pandas DataFrame:" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 4, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "df = table.to_pandas()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 5, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "passenger_count\n", | |
| "0 7.572316\n", | |
| "1 16.658665\n", | |
| "2 16.658453\n", | |
| "3 16.635687\n", | |
| "4 11.673152\n", | |
| "5 16.666666\n", | |
| "6 16.666666\n", | |
| "7 16.666668\n", | |
| "8 16.666668\n", | |
| "9 16.665361\n", | |
| "Name: tip_pct, dtype: float32" | |
| ] | |
| }, | |
| "execution_count": 5, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "df[\"tip_pct\"] = df[\"tip_amount\"] / df[\"total_amount\"] * 100\n", | |
| "df.groupby(\"passenger_count\")[\"tip_pct\"].median()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 6, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/html": [ | |
| "<div>\n", | |
| "<style scoped>\n", | |
| " .dataframe tbody tr th:only-of-type {\n", | |
| " vertical-align: middle;\n", | |
| " }\n", | |
| "\n", | |
| " .dataframe tbody tr th {\n", | |
| " vertical-align: top;\n", | |
| " }\n", | |
| "\n", | |
| " .dataframe thead th {\n", | |
| " text-align: right;\n", | |
| " }\n", | |
| "</style>\n", | |
| "<table border=\"1\" class=\"dataframe\">\n", | |
| " <thead>\n", | |
| " <tr style=\"text-align: right;\">\n", | |
| " <th></th>\n", | |
| " <th>tip_pct</th>\n", | |
| " <th>n</th>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>passenger_count</th>\n", | |
| " <th></th>\n", | |
| " <th></th>\n", | |
| " </tr>\n", | |
| " </thead>\n", | |
| " <tbody>\n", | |
| " <tr>\n", | |
| " <th>0</th>\n", | |
| " <td>7.572316</td>\n", | |
| " <td>627</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>1</th>\n", | |
| " <td>16.658665</td>\n", | |
| " <td>141262</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>2</th>\n", | |
| " <td>16.658453</td>\n", | |
| " <td>33369</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>3</th>\n", | |
| " <td>16.635687</td>\n", | |
| " <td>9285</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>4</th>\n", | |
| " <td>11.673152</td>\n", | |
| " <td>4912</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>5</th>\n", | |
| " <td>16.666666</td>\n", | |
| " <td>6443</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>6</th>\n", | |
| " <td>16.666666</td>\n", | |
| " <td>3588</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>7</th>\n", | |
| " <td>16.666668</td>\n", | |
| " <td>22</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>8</th>\n", | |
| " <td>16.666668</td>\n", | |
| " <td>56</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>9</th>\n", | |
| " <td>16.665361</td>\n", | |
| " <td>125</td>\n", | |
| " </tr>\n", | |
| " </tbody>\n", | |
| "</table>\n", | |
| "</div>" | |
| ], | |
| "text/plain": [ | |
| " tip_pct n\n", | |
| "passenger_count \n", | |
| "0 7.572316 627\n", | |
| "1 16.658665 141262\n", | |
| "2 16.658453 33369\n", | |
| "3 16.635687 9285\n", | |
| "4 11.673152 4912\n", | |
| "5 16.666666 6443\n", | |
| "6 16.666666 3588\n", | |
| "7 16.666668 22\n", | |
| "8 16.666668 56\n", | |
| "9 16.665361 125" | |
| ] | |
| }, | |
| "execution_count": 6, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "df.groupby(\"passenger_count\").aggregate(tip_pct=(\"tip_pct\", \"median\"), n=(\"tip_pct\", \"size\"))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "Timing the full sequence:" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 7, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "CPU times: user 2.9 s, sys: 163 ms, total: 3.06 s\n", | |
| "Wall time: 564 ms\n" | |
| ] | |
| }, | |
| { | |
| "data": { | |
| "text/html": [ | |
| "<div>\n", | |
| "<style scoped>\n", | |
| " .dataframe tbody tr th:only-of-type {\n", | |
| " vertical-align: middle;\n", | |
| " }\n", | |
| "\n", | |
| " .dataframe tbody tr th {\n", | |
| " vertical-align: top;\n", | |
| " }\n", | |
| "\n", | |
| " .dataframe thead th {\n", | |
| " text-align: right;\n", | |
| " }\n", | |
| "</style>\n", | |
| "<table border=\"1\" class=\"dataframe\">\n", | |
| " <thead>\n", | |
| " <tr style=\"text-align: right;\">\n", | |
| " <th></th>\n", | |
| " <th>tip_pct</th>\n", | |
| " <th>n</th>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>passenger_count</th>\n", | |
| " <th></th>\n", | |
| " <th></th>\n", | |
| " </tr>\n", | |
| " </thead>\n", | |
| " <tbody>\n", | |
| " <tr>\n", | |
| " <th>0</th>\n", | |
| " <td>7.572316</td>\n", | |
| " <td>627</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>1</th>\n", | |
| " <td>16.658665</td>\n", | |
| " <td>141262</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>2</th>\n", | |
| " <td>16.658453</td>\n", | |
| " <td>33369</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>3</th>\n", | |
| " <td>16.635687</td>\n", | |
| " <td>9285</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>4</th>\n", | |
| " <td>11.673152</td>\n", | |
| " <td>4912</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>5</th>\n", | |
| " <td>16.666666</td>\n", | |
| " <td>6443</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>6</th>\n", | |
| " <td>16.666666</td>\n", | |
| " <td>3588</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>7</th>\n", | |
| " <td>16.666668</td>\n", | |
| " <td>22</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>8</th>\n", | |
| " <td>16.666668</td>\n", | |
| " <td>56</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>9</th>\n", | |
| " <td>16.665361</td>\n", | |
| " <td>125</td>\n", | |
| " </tr>\n", | |
| " </tbody>\n", | |
| "</table>\n", | |
| "</div>" | |
| ], | |
| "text/plain": [ | |
| " tip_pct n\n", | |
| "passenger_count \n", | |
| "0 7.572316 627\n", | |
| "1 16.658665 141262\n", | |
| "2 16.658453 33369\n", | |
| "3 16.635687 9285\n", | |
| "4 11.673152 4912\n", | |
| "5 16.666666 6443\n", | |
| "6 16.666666 3588\n", | |
| "7 16.666668 22\n", | |
| "8 16.666668 56\n", | |
| "9 16.665361 125" | |
| ] | |
| }, | |
| "execution_count": 7, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "%%time\n", | |
| "table = dataset.to_table(columns=[\"tip_amount\", \"total_amount\", \"passenger_count\"], filter=(ds.field(\"year\") == 2017) & (ds.field(\"total_amount\") > 100))\n", | |
| "df = table.to_pandas()\n", | |
| "df[\"tip_pct\"] = df[\"tip_amount\"] / df[\"total_amount\"] * 100\n", | |
| "df.groupby(\"passenger_count\").aggregate(tip_pct=(\"tip_pct\", \"median\"), n=(\"tip_pct\", \"size\"))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Reading using pandas" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "In the case above, where the queried data is small enough to fit in memory, we could also directly use pandas to read the data:" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 8, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "df = pd.read_parquet(\"nyc-taxi-data/dask-partitioned/\", columns=[\"tip_amount\", \"total_amount\", \"passenger_count\"],\n", | |
| " filters=[(\"year\", \"==\", 2017), (\"total_amount\", \">\", 100)])" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 9, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "passenger_count\n", | |
| "0 7.572316\n", | |
| "1 16.658665\n", | |
| "2 16.658453\n", | |
| "3 16.635687\n", | |
| "4 11.673152\n", | |
| "5 16.666666\n", | |
| "6 16.666666\n", | |
| "7 16.666668\n", | |
| "8 16.666668\n", | |
| "9 16.665361\n", | |
| "Name: tip_pct, dtype: float32" | |
| ] | |
| }, | |
| "execution_count": 9, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "df[\"tip_pct\"] = df[\"tip_amount\"] / df[\"total_amount\"] * 100\n", | |
| "df.groupby(\"passenger_count\")[\"tip_pct\"].median()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Reading using dask" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "When the data is too large to directly handle, we can also make use of the integration of the pyarrow Parquet reader with dask:" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 10, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import dask\n", | |
| "import dask.dataframe as dd" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 11, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "ddf = dd.read_parquet(\"nyc-taxi-data/dask-partitioned/\", engine=\"pyarrow\", split_row_groups=False)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "For example, let's calculate the average trip distance for the full dataset:" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 12, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "CPU times: user 20.3 s, sys: 1.33 s, total: 21.7 s\n", | |
| "Wall time: 6.47 s\n" | |
| ] | |
| }, | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "3.319460242014189" | |
| ] | |
| }, | |
| "execution_count": 12, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "%%time\n", | |
| "ddf[\"trip_distance\"].mean().compute()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Manual yielding of record batches\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 13, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "dataset = ds.dataset(\"nyc-taxi-data/dask-partitioned/\", format=\"parquet\", partitioning=\"hive\")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 14, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "CPU times: user 1.32 s, sys: 90.7 ms, total: 1.41 s\n", | |
| "Wall time: 1.41 s\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "%%time\n", | |
| "\n", | |
| "num_rows = 0\n", | |
| "\n", | |
| "for task in dataset.scan(columns=[\"trip_distance\"]):\n", | |
| " for record_batch in task.execute():\n", | |
| " num_rows += record_batch.num_rows" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 15, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "278059647" | |
| ] | |
| }, | |
| "execution_count": 15, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "num_rows" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python (arrow-dev)", | |
| "language": "python", | |
| "name": "arrow-dev" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.7.3" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 4 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment