Skip to content

Instantly share code, notes, and snippets.

@TomAugspurger
Created September 29, 2017 14:15
Show Gist options
  • Save TomAugspurger/436442baeb353c53bcdf61adf6b32823 to your computer and use it in GitHub Desktop.
Save TomAugspurger/436442baeb353c53bcdf61adf6b32823 to your computer and use it in GitHub Desktop.
import os
import os.path as op
from time import time
import dask.dataframe as ddf
import dask.array as da
from dask import compute
from distributed import Client
def make_categorical_data(n_samples=int(1e7), n_features=10):
"""Generate some random categorical data
The default parameters should generate around 1GB of random integer data
with increasing cardinality along with a normally distributed real valued
target variable.
"""
feature_names = ['f_%03d' % i for i in range(n_features)]
features_series = [
da.random.randint(low=0, high=(i + 1) * 10, size=n_samples,
chunks=n_samples // 10)
for i in range(n_features)
]
features_series = [
ddf.from_dask_array(col_data, columns=[feature_name])
for col_data, feature_name in zip(features_series, feature_names)
]
target = da.random.normal(loc=0, scale=1, size=n_samples,
chunks=n_samples // 10)
target = ddf.from_dask_array(target, columns=['target'])
data = ddf.concat(features_series + [target], axis=1)
data = data.repartition(npartitions=10)
return data
def encode_with_target_mean(data, target_colname='target'):
"""Supervised encoding of categorical variables with per-group target mean.
All columns that contain integer values are replaced by real valued data
representing the average target value for each category.
"""
features_data = data.drop(target_colname, axis=1)
target_data = data[target_colname]
encode_columns = features_data.select_dtypes(
['int', 'object']).dtypes.index
mappings = [target_data.groupby(features_data[col]).mean()
for col in encode_columns]
mappings = compute(*mappings)
mappings = {m.index.name: m for m in mappings}
for col in encode_columns:
features_data[col] = features_data[col].map(mappings[col])
return ddf.concat([features_data, target_data], axis=1)
if __name__ == '__main__':
# make sure dask uses the distributed scheduler:
# Start the scheduler and at least one worker with:
# $ dask-scheduler
# $ dask-worker localhost:8786
#
c = Client('localhost:8786')
original_folder_name = op.abspath('random_categorical_data')
encoded_folder_name = op.abspath('random_encoded_data')
if not op.exists(original_folder_name):
print("Generating random categorical data in", original_folder_name)
os.mkdir(original_folder_name)
data = make_categorical_data()
ddf.to_parquet(original_folder_name, data)
print("Using data from", original_folder_name)
data = ddf.read_parquet(original_folder_name)
data = c.persist(data)
print("Encoding categorical variables...")
encoded = encode_with_target_mean(data, target_colname='target')
print("Saving encoded data to", encoded_folder_name)
t0 = time()
# Repartition to get small parquet files in the output folder.
encoded = encoded.repartition(npartitions=10)
encoded.to_parquet(encoded_folder_name)
print("done in %0.3fs" % (time() - t0))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment