Skip to content

Instantly share code, notes, and snippets.

@Sandy4321
Forked from TomAugspurger/criteo-2.ipynb
Created January 26, 2020 19:55
Show Gist options
  • Save Sandy4321/aa1f1881e18b7c7b6c48e4b61212c15f to your computer and use it in GitHub Desktop.
Save Sandy4321/aa1f1881e18b7c7b6c48e4b61212c15f to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"import dask.array as da\n",
"import dask.dataframe as dd\n",
"import numpy as np\n",
"import pandas as pd\n",
"from dask.distributed import Client"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Client</h3>\n",
"<ul>\n",
" <li><b>Scheduler: </b>tcp://127.0.0.1:63466\n",
" <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Cluster</h3>\n",
"<ul>\n",
" <li><b>Workers: </b>8</li>\n",
" <li><b>Cores: </b>8</li>\n",
" <li><b>Memory: </b>17.18 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: scheduler='tcp://127.0.0.1:63466' processes=8 cores=8>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client = Client()\n",
"client"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# categories = ['category_%d' % i for i in range(26)]\n",
"# columns = ['click'] + ['numeric_%d' % i for i in range(13)] + categories\n",
"\n",
"# df = dd.read_csv('data/day_0', sep='\\t', names=columns, header=None) \n",
"\n",
"# encoding = {c: 'bytes' for c in categories}\n",
"# fixed = {c: 8 for c in categories}\n",
"# df.to_parquet('data/day-0-bytes.parquet', object_encoding=encoding,\n",
"# fixed_text=fixed, compression='SNAPPY')\n",
"\n",
"# df_train, df_test = df.random_split([0.05, 0.95],\n",
"# random_state=42)\n",
"\n",
"# categories = ['category_%d' % i for i in range(26)]\n",
"# columns = ['click'] + ['numeric_%d' % i for i in range(13)] + categories\n",
"\n",
"# encoding = {c: 'bytes' for c in categories}\n",
"# fixed = {c: 8 for c in categories}\n",
"# df_train.to_parquet('data/day-0-train-bytes.parquet', object_encoding=encoding,\n",
"# fixed_text=fixed, compression='SNAPPY')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# df.to_parquet('data/sample.parquet')\n",
"# df = dd.read_parquet('data/sample.parquet').reset_index(drop=True)\n",
"\n",
"df = (\n",
" dd.read_parquet('data/day-0-train-bytes.parquet')\n",
" .repartition(npartitions=100)\n",
" .sample(frac=0.05, random_state=42)\n",
" .reset_index(drop=True)\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"categorical_columns = df.columns[df.columns.str.startswith('category')]\n",
"numeric_columns = df.columns[df.columns.str.startswith('numeric')]"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"df[categorical_columns.tolist()] = df[categorical_columns].astype('category')\n",
"y = df['click']\n",
"X = df.drop(\"click\", axis='columns')\n",
"\n",
"X, y = dask.persist(X, y)\n",
"\n",
"# compute\n",
"X = X.categorize()\n",
"X = X.persist()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"# compute\n",
"n_uniques, = dask.compute([X[col].nunique_approx() for col in categorical_columns])\n",
"n_uniques = pd.Series(dict(zip(categorical_columns, n_uniques)))\n",
"\n",
"onehot_columns = n_uniques[n_uniques < 100].index.tolist()\n",
"ordinal_columns = n_uniques.index.drop(onehot_columns).tolist()\n",
"\n",
"classes = [0, 1]"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"from dask_ml.compose import make_column_transformer\n",
"from dask_ml.preprocessing import QuantileTransformer, OneHotEncoder, OrdinalEncoder\n",
"from dask_ml.wrappers import Incremental\n",
"from sklearn.pipeline import make_pipeline\n",
"from sklearn.preprocessing import FunctionTransformer\n",
"from sklearn.linear_model import SGDClassifier\n",
"\n",
"import sklearn.base\n",
"\n",
"\n",
"class SillyNaNImputer(sklearn.base.BaseEstimator, sklearn.base.TransformerMixin):\n",
" def fit(self, X, y=None):\n",
" self.mean_ = X.mean(axis=0).compute()\n",
" return self\n",
" \n",
" def transform(self, X, y=None):\n",
" return X.fillna(self.mean_)\n",
" \n",
"\n",
"values_extractor = FunctionTransformer(lambda x: x.values, validate=False)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Pipeline(memory=None,\n",
" steps=[('columntransformer-1', ColumnTransformer(n_jobs=1, remainder='passthrough', sparse_threshold=0.3,\n",
" transformer_weights=None,\n",
" transformers=[('onehotencoder', OneHotEncoder(categorical_features=None, categories='auto',\n",
" dtype=<class 'numpy.float64'>, handle_unknown='erro...ion=0.1, verbose=0, warm_start=False),\n",
" random_state=None, scoring=None, shuffle_blocks=True))])"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"onehot_encoder = OneHotEncoder(sparse=False)\n",
"ordinal_encoder = OrdinalEncoder()\n",
"nan_imputer = SillyNaNImputer()\n",
"\n",
"to_numeric = make_column_transformer(\n",
" (onehot_columns, onehot_encoder),\n",
" (ordinal_columns, ordinal_encoder),\n",
" remainder='passthrough',\n",
")\n",
"fill_na = make_column_transformer(\n",
" (numeric_columns, nan_imputer),\n",
" remainder='passthrough'\n",
")\n",
"\n",
"clf = Incremental(SGDClassifier(random_state=0))\n",
"\n",
"pipe = make_pipeline(to_numeric, fill_na, values_extractor, clf)\n",
"pipe"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/taugspurger/sandbox/dask/dask/dataframe/multi.py:602: UserWarning: Concatenating dataframes with unknown divisions.\n",
"We're assuming that the indexes of each dataframes are \n",
"aligned. This assumption is not generally safe.\n",
" warn(\"Concatenating dataframes with unknown divisions.\\n\"\n"
]
},
{
"data": {
"text/plain": [
"Pipeline(memory=None,\n",
" steps=[('columntransformer-1', ColumnTransformer(n_jobs=1, remainder='passthrough', sparse_threshold=0.3,\n",
" transformer_weights=None,\n",
" transformers=[('onehotencoder', OneHotEncoder(categorical_features=None, categories='auto',\n",
" dtype=<class 'numpy.float64'>, handle_unknown='erro...ion=0.1, verbose=0, warm_start=False),\n",
" random_state=None, scoring=None, shuffle_blocks=True))])"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pipe.fit(X, y.values, incremental__classes=classes)"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/taugspurger/sandbox/dask/dask/dataframe/multi.py:602: UserWarning: Concatenating dataframes with unknown divisions.\n",
"We're assuming that the indexes of each dataframes are \n",
"aligned. This assumption is not generally safe.\n",
" warn(\"Concatenating dataframes with unknown divisions.\\n\"\n"
]
},
{
"data": {
"text/plain": [
"0.945875199333547"
]
},
"execution_count": 39,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pipe.score(X, y.values)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment