Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save mrocklin/eb7dcd5a32891a5ba7adb3de1927d175 to your computer and use it in GitHub Desktop.

Select an option

Save mrocklin/eb7dcd5a32891a5ba7adb3de1927d175 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Join Dask.DataFrame against small Pandas DataFrame\n",
"\n",
"### Advanced techniques: active memory management"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create fake dataset of products and categories"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"456976"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from itertools import product\n",
"\n",
"alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'\n",
"products = [''.join(x) for x in product(alphabet, alphabet, alphabet, alphabet)]\n",
"len(products)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"['AAAA', 'AAAB', 'AAAC', 'AAAD', 'AAAE', 'AAAF', 'AAAG', 'AAAH']"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"products[:8]"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"['ZZZS', 'ZZZT', 'ZZZU', 'ZZZV', 'ZZZW', 'ZZZX', 'ZZZY', 'ZZZZ']"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"products[-8:]"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>category</th>\n",
" <th>product</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>443</td>\n",
" <td>AAAA</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>594</td>\n",
" <td>AAAB</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>201</td>\n",
" <td>AAAC</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>809</td>\n",
" <td>AAAD</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>974</td>\n",
" <td>AAAE</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" category product\n",
"0 443 AAAA\n",
"1 594 AAAB\n",
"2 201 AAAC\n",
"3 809 AAAD\n",
"4 974 AAAE"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"\n",
"dimension = pd.DataFrame({'product': products, \n",
" 'category': np.random.randint(0, 1000, size=len(products))})\n",
"dimension.head()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def fake_data(size):\n",
" import random\n",
" import numpy as np\n",
" return pd.DataFrame({'product': [''.join(random.sample(alphabet, 4)) for i in range(size)],\n",
" 'value': np.random.exponential(100, size=size).astype(int),\n",
" 'rating': np.random.normal(0, 1, size=size)})"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>product</th>\n",
" <th>rating</th>\n",
" <th>value</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>BHYI</td>\n",
" <td>-2.189738</td>\n",
" <td>13</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>XFAS</td>\n",
" <td>-0.478826</td>\n",
" <td>72</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>ILBA</td>\n",
" <td>1.176059</td>\n",
" <td>35</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>VXYS</td>\n",
" <td>0.872137</td>\n",
" <td>51</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>GVEH</td>\n",
" <td>0.897264</td>\n",
" <td>22</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" product rating value\n",
"0 BHYI -2.189738 13\n",
"1 XFAS -0.478826 72\n",
"2 ILBA 1.176059 35\n",
"3 VXYS 0.872137 51\n",
"4 GVEH 0.897264 22"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"example = fake_data(5)\n",
"example"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>product</th>\n",
" <th>rating</th>\n",
" <th>value</th>\n",
" <th>category</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>BHYI</td>\n",
" <td>-2.189738</td>\n",
" <td>13</td>\n",
" <td>684</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>XFAS</td>\n",
" <td>-0.478826</td>\n",
" <td>72</td>\n",
" <td>832</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>ILBA</td>\n",
" <td>1.176059</td>\n",
" <td>35</td>\n",
" <td>539</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>VXYS</td>\n",
" <td>0.872137</td>\n",
" <td>51</td>\n",
" <td>375</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>GVEH</td>\n",
" <td>0.897264</td>\n",
" <td>22</td>\n",
" <td>749</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" product rating value category\n",
"0 BHYI -2.189738 13 684\n",
"1 XFAS -0.478826 72 832\n",
"2 ILBA 1.176059 35 539\n",
"3 VXYS 0.872137 51 375\n",
"4 GVEH 0.897264 22 749"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"example.merge(dimension, on='product', how='inner')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parallelize with Dask.Dataframe\n",
"\n",
"We do the same thing but now in parallel with dask.dataframe.\n",
"\n",
"We start on a single machine. We'll repeat on a cluster later."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Make a fake fact table\n",
"\n",
"Normally we we would load our data in from some external source as in the following:\n",
"\n",
"```python\n",
"df = dask.dataframe.read_csv('hdfs://path/to/my/data/*.csv')\n",
"```\n",
"\n",
"Instead we create a `fake_data` function to produce each \"file\". You can ignore this if you have data elsewhere."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import dask.dataframe as dd\n",
"from dask import delayed\n",
"\n",
"partitions = []\n",
"for i in range(10):\n",
" partitions.append(delayed(fake_data)(10000))\n",
" \n",
"example = fake_data(1)\n",
"\n",
"df = dd.from_delayed(partitions, example)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"dd.DataFrame<from-de..., npartitions=10>"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>product</th>\n",
" <th>rating</th>\n",
" <th>value</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>JHWM</td>\n",
" <td>-0.342292</td>\n",
" <td>198</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>DHCV</td>\n",
" <td>-1.399805</td>\n",
" <td>59</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>QHYL</td>\n",
" <td>0.176286</td>\n",
" <td>112</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>KAHU</td>\n",
" <td>0.476704</td>\n",
" <td>35</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>ULGB</td>\n",
" <td>0.635553</td>\n",
" <td>13</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" product rating value\n",
"0 JHWM -0.342292 198\n",
"1 DHCV -1.399805 59\n",
"2 QHYL 0.176286 112\n",
"3 KAHU 0.476704 35\n",
"4 ULGB 0.635553 13"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>product</th>\n",
" <th>rating</th>\n",
" <th>value</th>\n",
" <th>category</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>UFBO</td>\n",
" <td>-1.114049</td>\n",
" <td>28</td>\n",
" <td>168</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>OANM</td>\n",
" <td>-1.240420</td>\n",
" <td>75</td>\n",
" <td>521</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>KAZF</td>\n",
" <td>1.289296</td>\n",
" <td>103</td>\n",
" <td>744</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>BWYU</td>\n",
" <td>-0.755966</td>\n",
" <td>152</td>\n",
" <td>421</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>VHFT</td>\n",
" <td>0.277356</td>\n",
" <td>48</td>\n",
" <td>400</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" product rating value category\n",
"0 UFBO -1.114049 28 168\n",
"1 OANM -1.240420 75 521\n",
"2 KAZF 1.289296 103 744\n",
"3 BWYU -0.755966 152 421\n",
"4 VHFT 0.277356 48 400"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"joined = df.merge(dimension, how='inner', on='product')\n",
"joined.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Analyze the joined table\n",
"\n",
"We'll find the top rated categories"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"category\n",
"98 0.295137\n",
"706 0.273313\n",
"111 0.265551\n",
"798 0.263368\n",
"110 0.255219\n",
"550 0.250642\n",
"893 0.249949\n",
"647 0.244770\n",
"66 0.243825\n",
"816 0.237187\n",
"Name: rating, dtype: float64"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"joined.groupby('category').rating.mean().nlargest(10).compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Distributed computing\n",
"\n",
"We connect to a cluster of workers, and repeat the experiment on a larger cluster."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Executor: scheduler=localhost:8786 workers=320 threads=320>"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Executor, progress\n",
"e = Executor('localhost:8786')\n",
"e"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"dfs = [delayed(fake_data)(10000) for i in range(1000)]\n",
"example = fake_data(1)\n",
"df = dd.from_delayed(dfs, example)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"dd.DataFrame<from-de..., npartitions=1000>"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Scatter the dimension table"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>category</th>\n",
" <th>product</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>443</td>\n",
" <td>AAAA</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>594</td>\n",
" <td>AAAB</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>201</td>\n",
" <td>AAAC</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>809</td>\n",
" <td>AAAD</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>974</td>\n",
" <td>AAAE</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" category product\n",
"0 443 AAAA\n",
"1 594 AAAB\n",
"2 201 AAAC\n",
"3 809 AAAD\n",
"4 974 AAAE"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dimension.head()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dimension2 = dd.from_pandas(dimension, npartitions=1)\n",
"dimension2 = e.persist(dimension2)\n",
"e.replicate(dimension2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Persist Data on Cluster"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df = e.persist(df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Compute"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"joined = df.merge(dimension2, how='inner', on='product')\n",
"result = joined.groupby('category').rating.mean().nlargest(10)\n",
"future = e.compute(result)\n",
"progress(future)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dimension.head()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.1"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment