-
-
Save Sandy4321/aa1f1881e18b7c7b6c48e4b61212c15f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import dask\n", | |
"import dask.array as da\n", | |
"import dask.dataframe as dd\n", | |
"import numpy as np\n", | |
"import pandas as pd\n", | |
"from dask.distributed import Client" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<table style=\"border: 2px solid white;\">\n", | |
"<tr>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3>Client</h3>\n", | |
"<ul>\n", | |
" <li><b>Scheduler: </b>tcp://127.0.0.1:63466\n", | |
" <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n", | |
"</ul>\n", | |
"</td>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3>Cluster</h3>\n", | |
"<ul>\n", | |
" <li><b>Workers: </b>8</li>\n", | |
" <li><b>Cores: </b>8</li>\n", | |
" <li><b>Memory: </b>17.18 GB</li>\n", | |
"</ul>\n", | |
"</td>\n", | |
"</tr>\n", | |
"</table>" | |
], | |
"text/plain": [ | |
"<Client: scheduler='tcp://127.0.0.1:63466' processes=8 cores=8>" | |
] | |
}, | |
"execution_count": 2, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"client = Client()\n", | |
"client" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# categories = ['category_%d' % i for i in range(26)]\n", | |
"# columns = ['click'] + ['numeric_%d' % i for i in range(13)] + categories\n", | |
"\n", | |
"# df = dd.read_csv('data/day_0', sep='\\t', names=columns, header=None) \n", | |
"\n", | |
"# encoding = {c: 'bytes' for c in categories}\n", | |
"# fixed = {c: 8 for c in categories}\n", | |
"# df.to_parquet('data/day-0-bytes.parquet', object_encoding=encoding,\n", | |
"# fixed_text=fixed, compression='SNAPPY')\n", | |
"\n", | |
"# df_train, df_test = df.random_split([0.05, 0.95],\n", | |
"# random_state=42)\n", | |
"\n", | |
"# categories = ['category_%d' % i for i in range(26)]\n", | |
"# columns = ['click'] + ['numeric_%d' % i for i in range(13)] + categories\n", | |
"\n", | |
"# encoding = {c: 'bytes' for c in categories}\n", | |
"# fixed = {c: 8 for c in categories}\n", | |
"# df_train.to_parquet('data/day-0-train-bytes.parquet', object_encoding=encoding,\n", | |
"# fixed_text=fixed, compression='SNAPPY')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# df.to_parquet('data/sample.parquet')\n", | |
"# df = dd.read_parquet('data/sample.parquet').reset_index(drop=True)\n", | |
"\n", | |
"df = (\n", | |
" dd.read_parquet('data/day-0-train-bytes.parquet')\n", | |
" .repartition(npartitions=100)\n", | |
" .sample(frac=0.05, random_state=42)\n", | |
" .reset_index(drop=True)\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"categorical_columns = df.columns[df.columns.str.startswith('category')]\n", | |
"numeric_columns = df.columns[df.columns.str.startswith('numeric')]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"df[categorical_columns.tolist()] = df[categorical_columns].astype('category')\n", | |
"y = df['click']\n", | |
"X = df.drop(\"click\", axis='columns')\n", | |
"\n", | |
"X, y = dask.persist(X, y)\n", | |
"\n", | |
"# compute\n", | |
"X = X.categorize()\n", | |
"X = X.persist()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# compute\n", | |
"n_uniques, = dask.compute([X[col].nunique_approx() for col in categorical_columns])\n", | |
"n_uniques = pd.Series(dict(zip(categorical_columns, n_uniques)))\n", | |
"\n", | |
"onehot_columns = n_uniques[n_uniques < 100].index.tolist()\n", | |
"ordinal_columns = n_uniques.index.drop(onehot_columns).tolist()\n", | |
"\n", | |
"classes = [0, 1]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from dask_ml.compose import make_column_transformer\n", | |
"from dask_ml.preprocessing import QuantileTransformer, OneHotEncoder, OrdinalEncoder\n", | |
"from dask_ml.wrappers import Incremental\n", | |
"from sklearn.pipeline import make_pipeline\n", | |
"from sklearn.preprocessing import FunctionTransformer\n", | |
"from sklearn.linear_model import SGDClassifier\n", | |
"\n", | |
"import sklearn.base\n", | |
"\n", | |
"\n", | |
"class SillyNaNImputer(sklearn.base.BaseEstimator, sklearn.base.TransformerMixin):\n", | |
" def fit(self, X, y=None):\n", | |
" self.mean_ = X.mean(axis=0).compute()\n", | |
" return self\n", | |
" \n", | |
" def transform(self, X, y=None):\n", | |
" return X.fillna(self.mean_)\n", | |
" \n", | |
"\n", | |
"values_extractor = FunctionTransformer(lambda x: x.values, validate=False)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"Pipeline(memory=None,\n", | |
" steps=[('columntransformer-1', ColumnTransformer(n_jobs=1, remainder='passthrough', sparse_threshold=0.3,\n", | |
" transformer_weights=None,\n", | |
" transformers=[('onehotencoder', OneHotEncoder(categorical_features=None, categories='auto',\n", | |
" dtype=<class 'numpy.float64'>, handle_unknown='erro...ion=0.1, verbose=0, warm_start=False),\n", | |
" random_state=None, scoring=None, shuffle_blocks=True))])" | |
] | |
}, | |
"execution_count": 11, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"onehot_encoder = OneHotEncoder(sparse=False)\n", | |
"ordinal_encoder = OrdinalEncoder()\n", | |
"nan_imputer = SillyNaNImputer()\n", | |
"\n", | |
"to_numeric = make_column_transformer(\n", | |
" (onehot_columns, onehot_encoder),\n", | |
" (ordinal_columns, ordinal_encoder),\n", | |
" remainder='passthrough',\n", | |
")\n", | |
"fill_na = make_column_transformer(\n", | |
" (numeric_columns, nan_imputer),\n", | |
" remainder='passthrough'\n", | |
")\n", | |
"\n", | |
"clf = Incremental(SGDClassifier(random_state=0))\n", | |
"\n", | |
"pipe = make_pipeline(to_numeric, fill_na, values_extractor, clf)\n", | |
"pipe" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"/Users/taugspurger/sandbox/dask/dask/dataframe/multi.py:602: UserWarning: Concatenating dataframes with unknown divisions.\n", | |
"We're assuming that the indexes of each dataframes are \n", | |
"aligned. This assumption is not generally safe.\n", | |
" warn(\"Concatenating dataframes with unknown divisions.\\n\"\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"Pipeline(memory=None,\n", | |
" steps=[('columntransformer-1', ColumnTransformer(n_jobs=1, remainder='passthrough', sparse_threshold=0.3,\n", | |
" transformer_weights=None,\n", | |
" transformers=[('onehotencoder', OneHotEncoder(categorical_features=None, categories='auto',\n", | |
" dtype=<class 'numpy.float64'>, handle_unknown='erro...ion=0.1, verbose=0, warm_start=False),\n", | |
" random_state=None, scoring=None, shuffle_blocks=True))])" | |
] | |
}, | |
"execution_count": 12, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"pipe.fit(X, y.values, incremental__classes=classes)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 39, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"/Users/taugspurger/sandbox/dask/dask/dataframe/multi.py:602: UserWarning: Concatenating dataframes with unknown divisions.\n", | |
"We're assuming that the indexes of each dataframes are \n", | |
"aligned. This assumption is not generally safe.\n", | |
" warn(\"Concatenating dataframes with unknown divisions.\\n\"\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"0.945875199333547" | |
] | |
}, | |
"execution_count": 39, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"pipe.score(X, y.values)" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.6.5" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment