Skip to content

Instantly share code, notes, and snippets.

@stsievert
Last active January 27, 2020 14:16
Show Gist options
  • Save stsievert/30702575de95328f199ab1d7e50795ef to your computer and use it in GitHub Desktop.
Save stsievert/30702575de95328f199ab1d7e50795ef to your computer and use it in GitHub Desktop.
Criteo dataset example
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Download data from: http://labs.criteo.com/2013/12/download-terabyte-click-logs-2/. I only read in (part of) one day, `day_0`."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The below cell is uncommented and run to get the input data (not tested, which is why it's commented)."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# import dask.dataframe as dd\n",
"# from dask.distributed import Client\n",
"# client = Client()\n",
"\n",
"# categories = ['category_%d' % i for i in range(26)]\n",
"# columns = ['click'] + ['numeric_%d' % i for i in range(13)] + categories\n",
"\n",
"# df = dd.read_csv('data/day_0', sep='\\t', names=columns, header=None) \n",
"\n",
"# encoding = {c: 'bytes' for c in categories}\n",
"# fixed = {c: 8 for c in categories}\n",
"# df.to_parquet('data/day-0-bytes.parquet', object_encoding=encoding,\n",
"# fixed_text=fixed, compression='SNAPPY')\n",
"\n",
"# df_train, df_test = df.random_split([0.05, 0.95],\n",
"# random_state=42)\n",
"\n",
"# categories = ['category_%d' % i for i in range(26)]\n",
"# columns = ['click'] + ['numeric_%d' % i for i in range(13)] + categories\n",
"\n",
"# encoding = {c: 'bytes' for c in categories}\n",
"# fixed = {c: 8 for c in categories}\n",
"# df_train.to_parquet('data/day-0-train-bytes.parquet', object_encoding=encoding,\n",
"# fixed_text=fixed, compression='SNAPPY')"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import dask.dataframe as dd\n",
"from dask.distributed import Client\n",
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Client</h3>\n",
"<ul>\n",
" <li><b>Scheduler: </b>tcp://127.0.0.1:59115\n",
" <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Cluster</h3>\n",
"<ul>\n",
" <li><b>Workers: </b>8</li>\n",
" <li><b>Cores: </b>8</li>\n",
" <li><b>Memory: </b>17.18 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: scheduler='tcp://127.0.0.1:59115' processes=8 cores=8>"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client = Client(threads_per_worker=1, n_workers=8)\n",
"client"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"df = dd.read_parquet('data/day-0-train-bytes.parquet')"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"df = df.sample(frac=0.01).persist()\n",
"df = df.compute()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"y = df['click'].values"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`Munge` and `Normalize` are still really rough: my only goal for these cells was to get something working."
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.base import BaseEstimator, TransformerMixin\n",
"from sklearn.preprocessing import quantile_transform\n",
"import numpy as np\n",
"\n",
"class Munge(BaseEstimator, TransformerMixin):\n",
" def __init__(self, *args, threshold=0.001, **kwargs):\n",
" self.threshold = threshold\n",
" super().__init__(*args, **kwargs)\n",
" \n",
" def fit(self, X, y=None):\n",
" return self\n",
" \n",
" def transform(self, df):\n",
" # Get indicator variables\n",
" n = len(df)\n",
" print(\"df.shape =\", df.shape)\n",
" \n",
" df = pd.get_dummies(df, sparse=True)\n",
" print(\"with indicator variables df.shape =\", df.shape)\n",
" \n",
" # filter out indicators that are used by less than 0.01% of people\n",
" feature_cols = [col for col in df.columns\n",
" if 'category' in col or 'numeric' in col]\n",
" \n",
" keep = [col for col in feature_cols if df[col].sum() / n > self.threshold]\n",
" df = df[keep]\n",
" print(\"after trimming feature matrix =\", df.shape)\n",
" \n",
" # missing values => {categorical: user doesn't have feature}\n",
" # numeric is a bit different. Most features are positive and fairly\n",
" # small, so I assume 0 is a good missing value (plus it's a good\n",
" # missing value for linear predictors)\n",
" df.fillna(value=0, inplace=True)\n",
" return df\n",
" \n",
"class Normalize(BaseEstimator, TransformerMixin):\n",
" def __init__(self, *args, dist='normal', **kwargs):\n",
" self.dist = dist\n",
" super().__init__(*args, **kwargs)\n",
" \n",
" def fit(self, X, y=None):\n",
" return self\n",
" \n",
" def transform(self, df):\n",
" categories = [col for col in df.columns if 'category' in col]\n",
" numeric = [col for col in df.columns if 'numeric' in col]\n",
" \n",
" # make the features approximately Gaussian\n",
" # This is used instead of StandardScalar to help\n",
" # handle outliers (which there are a lot of in this dataset)\n",
" numeric_features = quantile_transform(df[numeric].values,\n",
" output_distribution=self.dist)\n",
" categorical_features = df[categories].values\n",
" \n",
" num_cate = categorical_features.shape[1]\n",
" num_numerical = numeric_features.shape[1]\n",
" self.features_ = {'numeric': np.arange(num_cate, num_cate + num_numerical, dtype=int),\n",
" 'categorical': np.arange(num_cate, dtype=int)}\n",
" X = np.hstack((categorical_features, numeric_features))\n",
" print(\"feature matrix shape =\", X.shape)\n",
" X -= np.median(X, axis=0)\n",
" return X"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.pipeline import Pipeline\n",
"munge = Munge()\n",
"norm = Normalize()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"df.shape = (97955, 40)\n",
"with indicator variables df.shape = (97955, 226831)\n",
"after trimming feature matrix = (97955, 1669)\n",
"CPU times: user 3min 55s, sys: 4.51 s, total: 3min 59s\n",
"Wall time: 3min 55s\n"
]
}
],
"source": [
"%%time\n",
"df = munge.transform(df)"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"feature matrix shape = (97955, 1669)\n",
"CPU times: user 2.11 s, sys: 1.02 s, total: 3.12 s\n",
"Wall time: 3.08 s\n"
]
}
],
"source": [
"%%time\n",
"X = norm.transform(df)"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([[-0.6951397 , 0.79657621, 0.39199884, ..., 0. ,\n",
" 1.1001425 , 0.16129644],\n",
" [-5.19055549, -0.50224282, -5.10132452, ..., -5.30365568,\n",
" 0.81683179, -5.26839371],\n",
" [-0.24741611, 0.36678605, -0.30126316, ..., 0. ,\n",
" 0.50040387, -0.49565754],\n",
" ...,\n",
" [ 0.09167847, 1.45153507, -5.10132452, ..., -5.30365568,\n",
" 0.81297039, -0.20749966],\n",
" [ 0.17133449, -0.44051127, -0.30126316, ..., -5.30365568,\n",
" 1.53000694, -0.49565754],\n",
" [ 1.17818276, -0.5542076 , 0.77329052, ..., 0. ,\n",
" 0.52262393, 0.51592775]])"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import matplotlib.pyplot as plt\n",
"show = X[:, norm.features_['numeric']]\n",
"show"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<matplotlib.axes._subplots.AxesSubplot at 0x1c2036cf60>"
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"summary = pd.DataFrame({'median': np.median(show, axis=0),\n",
" 'std': np.std(show, axis=0),\n",
" 'mean': np.mean(show, axis=0)})\n",
"summary.plot(style='o-')"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"from sklearn.model_selection import train_test_split"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
"X_train, X_test, y_train, y_test = train_test_split(X, y)"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.linear_model import LogisticRegression\n",
"from sklearn.metrics import log_loss, make_scorer\n",
"\n",
"class LogisticProbs(LogisticRegression):\n",
" def score(self, X, y):\n",
" prob_hat = self.predict_proba(X)\n",
" score = log_loss(y, prob_hat)\n",
" return score\n",
"\n",
"est = LogisticProbs(penalty='l2', # l1\n",
" C=1e2, # loguniform(5, -1)\n",
" class_weight='balanced', # or [1/0.97, 1/0.03]\n",
" solver='saga',\n",
" tol=1e-5, # loguniform(-3, -5)\n",
" n_jobs=-1)"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/ssievert/anaconda3/envs/dask-master/lib/python3.6/site-packages/sklearn/linear_model/sag.py:326: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge\n",
" \"the coef_ did not converge\", ConvergenceWarning)\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 3min 8s, sys: 2.25 s, total: 3min 10s\n",
"Wall time: 1min 50s\n"
]
},
{
"data": {
"text/plain": [
"LogisticProbs(C=100.0, class_weight='balanced', dual=False,\n",
" fit_intercept=True, intercept_scaling=1, max_iter=100,\n",
" multi_class='ovr', n_jobs=-1, penalty='l2', random_state=None,\n",
" solver='saga', tol=1e-05, verbose=0, warm_start=False)"
]
},
"execution_count": 42,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"est.fit(X_train, y_train)"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0.7100791417571256"
]
},
"execution_count": 43,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"est.score(X_test, y_test)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The best scores on the 2014 [Criteo Kaggle] competition (using the same loss) is 0.44463 (lower is better).\n",
"\n",
"The result I have here is very variable: the score is pretty variable when `LogisticProbs` is rerun (i.e., when it uses a different random seed for SAGA). The results I've gotten are `[0.442, 0.9139, 0.7101]` with the same parameters but different train/test splits and `LogisticProbs` random states.\n",
"\n",
"[Criteo Kaggle]:https://www.kaggle.com/c/criteo-display-ad-challenge"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
@Sandy4321
Copy link

@stsievert
Copy link
Author

stsievert commented Jan 27, 2020

It looks like (dask/dask-ml#295 (comment)) I ran this notebook locally because it only uses a (very small) subset of the dataset. I don't recall ever using the complete Criteo dataset, or even a significant fraction of it

@Sandy4321
Copy link

But you use dask.distrebuted
Also
Data is only 370gb in zipped file
In this link
https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#criteo_tb

@stsievert
Copy link
Author

stsievert commented Jan 27, 2020

I mostly used Distributed for it's useful dashboard (for debugging/profiling/etc). I didn't focus on actually scaling to entire Criteo dataset; IIRC this simple use case illustrated some problems in Dask-ML.

My metric for "big data" is any data that's too large to fit in RAM. 370GB is certainly more RAM than the 16GB my local machine has.

@Sandy4321
Copy link

I see
Is dask distributed is free
And will read data from lobsvm format?

@stsievert
Copy link
Author

Is dask distributed is free

Yes. Free as in beer (i.e, doesn't cost money) and free as in speech (the source is freely available).

And will read data from lobsvm format?

Yes. Dask-ML is a wrapper around scikit-learn, and they have a function for read in libsvm: https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_svmlight_file.html. It'd be pretty simple to wrap that function with Dask:

from sklearn.datasets import load_svmlight_file

def read_chunk(filename):
    X, y = load_svmlight_file(filename)
    return X, y  # scipy.sparse matrix, raw ndarray

from distributed import Client
client = Client()

filenames = ["criteo-day-1.svmlight", ...]
Xs_ys = client.map(read_chunk, filenames)
# Xs_ys will be tasked to the cluster, and will perform work in the background

# continue with rest of notebook

This code is untested.

@Sandy4321
Copy link

Great code thanks
So dusk can help in both cases to read original RTB Criteo file or libsvm format
Only short question:
In your code above - load_svmlight_file meanse to read any svmlib format or specific svmlight format
Again thank a lot taking care

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment