Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created April 19, 2016 22:00
Show Gist options
  • Save mrocklin/9f5720d8658e5f2f66666815b1f03f00 to your computer and use it in GitHub Desktop.
Save mrocklin/9f5720d8658e5f2f66666815b1f03f00 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Ad-Hoc Distributed Random Forests on NYCTaxi Dataframes\n",
"=======================================================\n",
"\n",
"Using Dask.distributed and Scikit-Learn we train a distributed random forest on the NYCTaxi data.\n",
"\n",
"**Learning Objective**: Predict passenger counts given fare, distance, location, etc..\n",
"\n",
"**Actual Objective**: Show how to use dask.distributed in a free-form way without collections\n",
"\n",
"**Disclaimer**: Our machine learning approach is flawed\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from sklearn.ensemble import RandomForestClassifier\n",
"from sklearn.cross_validation import train_test_split"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Executor: scheduler=52.91.1.177:8786 workers=48 threads=48>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from distributed import Executor, progress, wait\n",
"e = Executor('52.91.1.177:8786')\n",
"e"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## NYCTaxi data living on S3\n",
"\n",
"This is something like 60GB in RAM.\n",
"\n",
"We'll try to predict `passenger_count` given the other numeric columns."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Setting global dask scheduler to use distributed\n"
]
}
],
"source": [
"from distributed import s3\n",
"\n",
"dfs = s3.read_csv('dask-data/nyc-taxi/2015', \n",
" parse_dates=['tpep_pickup_datetime', \n",
" 'tpep_dropoff_datetime'],\n",
" collection=False)\n",
"\n",
"dfs = e.compute(dfs)\n",
"progress(dfs)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[<Future: status: finished, type: DataFrame, key: finalize-a06c3dd25769f434978fa27d5a4cf24b>,\n",
" <Future: status: finished, type: DataFrame, key: finalize-7dcb27364a8701f45cb02d2fe034728a>,\n",
" <Future: status: finished, type: DataFrame, key: finalize-b0dfe075000bd59c3a90bfdf89a990da>,\n",
" <Future: status: finished, type: DataFrame, key: finalize-1c9bb25cefa1b892fac9b48c0aef7e04>,\n",
" <Future: status: finished, type: DataFrame, key: finalize-c8254256b09ae287badca3cf6d9e3142>,\n",
" <Future: status: finished, type: DataFrame, key: finalize-068ad2a451f0de5e5399a279e3e30a46>,\n",
" <Future: status: finished, type: DataFrame, key: finalize-05d1aadcb3e96263de461e2e31baed2f>,\n",
" <Future: status: finished, type: DataFrame, key: finalize-d2a698c51a3c4ab07755598a128a1d27>,\n",
" <Future: status: finished, type: DataFrame, key: finalize-65e7b6d583a829ff2fc352f16a533a93>,\n",
" <Future: status: finished, type: DataFrame, key: finalize-0e409cb127058af5445427536ccd11b5>]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dfs[:10]"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df = dfs[0].result()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>VendorID</th>\n",
" <th>tpep_pickup_datetime</th>\n",
" <th>tpep_dropoff_datetime</th>\n",
" <th>passenger_count</th>\n",
" <th>trip_distance</th>\n",
" <th>pickup_longitude</th>\n",
" <th>pickup_latitude</th>\n",
" <th>RateCodeID</th>\n",
" <th>store_and_fwd_flag</th>\n",
" <th>dropoff_longitude</th>\n",
" <th>dropoff_latitude</th>\n",
" <th>payment_type</th>\n",
" <th>fare_amount</th>\n",
" <th>extra</th>\n",
" <th>mta_tax</th>\n",
" <th>tip_amount</th>\n",
" <th>tolls_amount</th>\n",
" <th>improvement_surcharge</th>\n",
" <th>total_amount</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>861675</th>\n",
" <td>2</td>\n",
" <td>2015-01-29 04:15:00</td>\n",
" <td>2015-01-29 04:20:53</td>\n",
" <td>1</td>\n",
" <td>2.07</td>\n",
" <td>-73.987602</td>\n",
" <td>40.741913</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.980034</td>\n",
" <td>40.765118</td>\n",
" <td>1</td>\n",
" <td>8.0</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>1.70</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>11.00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>861676</th>\n",
" <td>2</td>\n",
" <td>2015-01-29 04:15:02</td>\n",
" <td>2015-01-29 04:26:53</td>\n",
" <td>1</td>\n",
" <td>3.03</td>\n",
" <td>-73.996788</td>\n",
" <td>40.716919</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.969688</td>\n",
" <td>40.753460</td>\n",
" <td>1</td>\n",
" <td>12.0</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>2.66</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>15.96</td>\n",
" </tr>\n",
" <tr>\n",
" <th>861677</th>\n",
" <td>2</td>\n",
" <td>2015-01-29 04:15:04</td>\n",
" <td>2015-01-29 04:28:21</td>\n",
" <td>2</td>\n",
" <td>3.86</td>\n",
" <td>-73.968002</td>\n",
" <td>40.759602</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.913498</td>\n",
" <td>40.765415</td>\n",
" <td>1</td>\n",
" <td>14.0</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>3.06</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>18.36</td>\n",
" </tr>\n",
" <tr>\n",
" <th>861678</th>\n",
" <td>2</td>\n",
" <td>2015-01-29 04:15:14</td>\n",
" <td>2015-01-29 04:27:06</td>\n",
" <td>2</td>\n",
" <td>3.58</td>\n",
" <td>-73.987610</td>\n",
" <td>40.718311</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.929070</td>\n",
" <td>40.693588</td>\n",
" <td>1</td>\n",
" <td>13.0</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>3.00</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>17.30</td>\n",
" </tr>\n",
" <tr>\n",
" <th>861679</th>\n",
" <td>2</td>\n",
" <td>2015-01-29 04:15:15</td>\n",
" <td>2015-01-29 04:23:06</td>\n",
" <td>1</td>\n",
" <td>2.54</td>\n",
" <td>-74.004921</td>\n",
" <td>40.737560</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.978279</td>\n",
" <td>40.762989</td>\n",
" <td>1</td>\n",
" <td>9.5</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>1.80</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>12.60</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"861675 2 2015-01-29 04:15:00 2015-01-29 04:20:53 1 \n",
"861676 2 2015-01-29 04:15:02 2015-01-29 04:26:53 1 \n",
"861677 2 2015-01-29 04:15:04 2015-01-29 04:28:21 2 \n",
"861678 2 2015-01-29 04:15:14 2015-01-29 04:27:06 2 \n",
"861679 2 2015-01-29 04:15:15 2015-01-29 04:23:06 1 \n",
"\n",
" trip_distance pickup_longitude pickup_latitude RateCodeID \\\n",
"861675 2.07 -73.987602 40.741913 1 \n",
"861676 3.03 -73.996788 40.716919 1 \n",
"861677 3.86 -73.968002 40.759602 1 \n",
"861678 3.58 -73.987610 40.718311 1 \n",
"861679 2.54 -74.004921 40.737560 1 \n",
"\n",
" store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type \\\n",
"861675 N -73.980034 40.765118 1 \n",
"861676 N -73.969688 40.753460 1 \n",
"861677 N -73.913498 40.765415 1 \n",
"861678 N -73.929070 40.693588 1 \n",
"861679 N -73.978279 40.762989 1 \n",
"\n",
" fare_amount extra mta_tax tip_amount tolls_amount \\\n",
"861675 8.0 0.5 0.5 1.70 0.0 \n",
"861676 12.0 0.5 0.5 2.66 0.0 \n",
"861677 14.0 0.5 0.5 3.06 0.0 \n",
"861678 13.0 0.5 0.5 3.00 0.0 \n",
"861679 9.5 0.5 0.5 1.80 0.0 \n",
"\n",
" improvement_surcharge total_amount \n",
"861675 0.3 11.00 \n",
"861676 0.3 15.96 \n",
"861677 0.3 18.36 \n",
"861678 0.3 17.30 \n",
"861679 0.3 12.60 "
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.tail()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"178"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(dfs)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',\n",
" 'passenger_count', 'trip_distance', 'pickup_longitude',\n",
" 'pickup_latitude', 'RateCodeID', 'store_and_fwd_flag',\n",
" 'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount',\n",
" 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',\n",
" 'improvement_surcharge', 'total_amount'],\n",
" dtype='object')"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.columns"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Start with a sample on a single machine "
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df_train, df_test = train_test_split(df)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 10.6 s, sys: 72.9 ms, total: 10.6 s\n",
"Wall time: 10.6 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"columns = ['trip_distance', 'pickup_longitude', 'pickup_latitude', \n",
" 'dropoff_longitude', 'dropoff_latitude', 'payment_type', \n",
" 'fare_amount', 'mta_tax', 'tip_amount', 'tolls_amount']\n",
"\n",
"est = RandomForestClassifier(n_estimators=4)\n",
"est.fit(df_train[columns], df_train.passenger_count)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Score results"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0.65808188654721012"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"est.score(df_test[columns], df_test.passenger_count)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"OK, 65% accuracy isn't bad. \n",
"\n",
"But really, always guessing a single passenger wouldn't be that much worse."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0.70669390028780987"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from sklearn.metrics import accuracy_score\n",
"import numpy as np\n",
"accuracy_score(df_test.passenger_count, \n",
" np.ones_like(df_test.passenger_count))"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0.70669390028780987"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"(df_test.passenger_count == 1).sum() / len(df_test)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"So lets just be upfront that I'm probably not choosing the correct algorithms here. Machine learning requires at least a little bit of expertise to do well."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Distributed fit with `e.map`\n",
"\n",
"Lets keep going through the motions of fitting on a cluster though. It'll be informative, I promise.\n",
"\n",
"We'll map a function across our futures with `e.map`."
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"178"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(dfs)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def fit(df):\n",
" est = RandomForestClassifier(n_estimators=4)\n",
" est.fit(df[columns], df.passenger_count)\n",
" return est\n",
"\n",
"train = dfs[:-1]\n",
"test = dfs[-1]\n",
"\n",
"estimators = e.map(fit, train)\n",
"progress(estimators, complete=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Broadcast our test data across all nodes"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Future: status: finished, type: DataFrame, key: finalize-db2f49b2b5259cc855a3e8252c279e26>"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 6.93 ms, sys: 352 µs, total: 7.29 ms\n",
"Wall time: 2.88 s\n"
]
}
],
"source": [
"%time e.replicate([test], n=48)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Make predictions from each of our models\n",
"\n",
"We'll use `e.submit(function, *args)` in a loop to submit more tasks"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def predict(est, X):\n",
" return est.predict(X[columns])\n",
"\n",
"predictions = [e.submit(predict, est, test) for est in estimators]\n",
"progress(predictions, complete=False)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"array([1, 1, 1, ..., 1, 1, 1])"
]
},
"execution_count": 22,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"x = predictions[3].result()\n",
"x"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"(328942,)"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"x.shape"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Aggregate by Majority Vote"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from scipy.stats import mode\n",
"import numpy as np\n",
"\n",
"def mymode(*arrays):\n",
" array = np.stack(arrays, axis=0)\n",
" return mode(array)[0][0]"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[array([1, 2, 1, ..., 2, 2, 1]),\n",
" array([1, 1, 1, ..., 1, 1, 1]),\n",
" array([2, 1, 1, ..., 1, 1, 1]),\n",
" array([1, 1, 1, ..., 1, 1, 1])]"
]
},
"execution_count": 25,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"a_few_predictions = e.gather(predictions[:4])\n",
"a_few_predictions"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"array([1, 1, 1, ..., 1, 1, 1])"
]
},
"execution_count": 26,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"mymode(*a_few_predictions)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Tree reduce predictions together to single prediciton\n",
"\n",
"We'll use `e.submit(...)` in a nested loop for more interesting tasks"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from toolz import partition_all\n",
"preds = predictions\n",
"while len(preds) > 1:\n",
" preds = [e.submit(mymode, *chunk) \n",
" for chunk in partition_all(10, preds)]\n",
"progress(preds, complete=False)"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"result = e.gather(preds)[0]"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"array([1, 1, 1, ..., 1, 1, 1])"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"result"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0.67061974451423045"
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"accuracy_score(result, test.result().passenger_count)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Too many single-passenger rides"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"{1: 328922, 2: 20}"
]
},
"execution_count": 31,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from toolz import frequencies\n",
"frequencies(result)"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"{0: 132, 1: 299636, 2: 18977, 3: 2799, 4: 960, 5: 4196, 6: 2242}"
]
},
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"frequencies(predictions[3].result())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Conclusion\n",
"\n",
"* Saw dask.distributed task API\n",
" * `e.submit(function, *args)`\n",
" * `e.map(function, sequence)`\n",
" * `e.gather(futures)`\n",
"\n",
"* Our machine learning algorithms could improve\n",
"* Replicate with [dec2](https://github.com/dask/dec2/)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.1"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment