Slides of the presentation:
https://speakerdeck.com/ogrisel/parallel-and-large-scale-machine-learning-with-scikit-learn
*.pyc | |
*.npy | |
*.pkl | |
*.mmap |
Slides of the presentation:
https://speakerdeck.com/ogrisel/parallel-and-large-scale-machine-learning-with-scikit-learn
import os | |
from IPython.parallel import interactive | |
@interactive | |
def persist_cv_splits(name, X, y, n_cv_iter=5, suffix="_cv_%03d.pkl", | |
test_size=0.25, random_state=None): | |
"""Materialize randomized train test splits of a dataset.""" | |
from sklearn.externals import joblib | |
from sklearn.cross_validation import ShuffleSplit | |
import os | |
cv = ShuffleSplit(X.shape[0], n_iter=n_cv_iter, | |
test_size=test_size, random_state=random_state) | |
cv_split_filenames = [] | |
for i, (train, test) in enumerate(cv): | |
cv_fold = (X[train], y[train], X[test], y[test]) | |
cv_split_filename = os.path.abspath(name + suffix % i) | |
joblib.dump(cv_fold, cv_split_filename) | |
cv_split_filenames.append(cv_split_filename) | |
return cv_split_filenames | |
import os | |
def warm_mmap_on_cv_splits(client, cv_split_filenames): | |
"""Trigger a disk load on all the arrays of the CV splits | |
Assume the files are shared on all the hosts using NFS. | |
""" | |
# First step: query cluster to fetch one engine id per host | |
all_engines = client[:] | |
@interactive | |
def hostname(): | |
import socket | |
return socket.gethostname() | |
hostnames = all_engines.apply(hostname).get_dict() | |
one_engine_per_host = dict((hostname, engine_id) | |
for engine_id, hostname | |
in hostnames.items()) | |
hosts_view = client[one_engine_per_host.values()] | |
# Second step: for each data file and host, mmap the arrays of the file | |
# and trigger a sequential read of all the arrays' data | |
@interactive | |
def load_in_memory(filenames): | |
from sklearn.externals import joblib | |
for filename in filenames: | |
arrays = joblib.load(filename, mmap_mode='r') | |
for array in arrays: | |
array.sum() # trigger the disk read | |
cv_split_filenames = [os.path.abspath(f) for f in cv_split_filenames] | |
hosts_view.apply_sync(load_in_memory, cv_split_filenames) |
from collections import namedtuple | |
from collections import defaultdict | |
from IPython.parallel import interactive | |
from IPython.parallel import TaskAborted | |
from scipy.stats import sem | |
import numpy as np | |
from sklearn.utils import check_random_state | |
try: | |
# sklearn 0.14+ | |
from sklearn.grid_search import ParameterGrid | |
except ImportError: | |
# sklearn 0.13 | |
from sklearn.grid_search import IterGrid as ParameterGrid | |
from mmap_utils import warm_mmap_on_cv_splits | |
def is_aborted(task): | |
return isinstance(getattr(task, '_exception', None), TaskAborted) | |
@interactive | |
def compute_evaluation(model, cv_split_filename, params=None, | |
train_fraction=1.0, mmap_mode='r'): | |
"""Function executed on a worker to evaluate a model on a given CV split""" | |
# All module imports should be executed in the worker namespace | |
from time import time | |
from sklearn.externals import joblib | |
X_train, y_train, X_test, y_test = joblib.load( | |
cv_split_filename, mmap_mode=mmap_mode) | |
# Slice a subset of the training set for plotting learning curves | |
n_samples_train = int(train_fraction * X_train.shape[0]) | |
X_train = X_train[:n_samples_train] | |
y_train = y_train[:n_samples_train] | |
# Configure the model | |
if model is not None: | |
model.set_params(**params) | |
# Fit model and measure training time | |
t0 = time() | |
model.fit(X_train, y_train) | |
train_time = time() - t0 | |
# Compute score on training set | |
train_score = model.score(X_train, y_train) | |
# Compute score on test set | |
test_score = model.score(X_test, y_test) | |
# Wrap evaluation results in a simple tuple datastructure | |
return (test_score, train_score, train_time, | |
train_fraction, params) | |
# Named tuple to collect evaluation results | |
Evaluation = namedtuple('Evaluation', ( | |
'validation_score', | |
'train_score', | |
'train_time', | |
'train_fraction', | |
'parameters')) | |
class RandomizedGridSeach(object): | |
""""Async Randomized Parameter search.""" | |
def __init__(self, load_balanced_view, random_state=0): | |
self.task_groups = [] | |
self.lb_view = load_balanced_view | |
self.random_state = random_state | |
def map_tasks(self, f): | |
return [f(task) for task_group in self.task_groups | |
for task in task_group] | |
def abort(self): | |
for task_group in self.task_groups: | |
for task in task_group: | |
if not task.ready(): | |
try: | |
task.abort() | |
except AssertionError: | |
pass | |
return self | |
def wait(self): | |
self.map_tasks(lambda t: t.wait()) | |
return self | |
def completed(self): | |
return sum(self.map_tasks(lambda t: t.ready() and not is_aborted(t))) | |
def total(self): | |
return sum(self.map_tasks(lambda t: 1)) | |
def progress(self): | |
c = self.completed() | |
if c == 0: | |
return 0.0 | |
else: | |
return float(c) / self.total() | |
def reset(self): | |
# Abort any other previously scheduled tasks | |
self.map_tasks(lambda t: t.abort()) | |
# Schedule a new batch of evalutation tasks | |
self.task_groups, self.all_parameters = [], [] | |
def launch_for_splits(self, model, parameter_grid, cv_split_filenames, | |
pre_warm=True): | |
"""Launch a Grid Search on precomputed CV splits.""" | |
# Abort any existing processing and erase previous state | |
self.reset() | |
self.parameter_grid = parameter_grid | |
# Warm the OS disk cache on each host with sequential reads | |
# XXX: fix me: interactive namespace issues to resolve | |
# if pre_warm: | |
# warm_mmap_on_cv_splits(self.lb_view.client, cv_split_filenames) | |
# Randomize the grid order | |
random_state = check_random_state(self.random_state) | |
self.all_parameters = list(ParameterGrid(parameter_grid)) | |
random_state.shuffle(self.all_parameters) | |
for params in self.all_parameters: | |
task_group = [] | |
for cv_split_filename in cv_split_filenames: | |
task = self.lb_view.apply(compute_evaluation, | |
model, cv_split_filename, params=params) | |
task_group.append(task) | |
self.task_groups.append(task_group) | |
# Make it possible to chain method calls | |
return self | |
def find_bests(self, n_top=5): | |
"""Compute the mean score of the completed tasks""" | |
mean_scores = [] | |
for params, task_group in zip(self.all_parameters, self.task_groups): | |
evaluations = [Evaluation(*t.get()) | |
for t in task_group | |
if t.ready() and not is_aborted(t)] | |
if len(evaluations) == 0: | |
continue | |
val_scores = [e.validation_score for e in evaluations] | |
train_scores = [e.train_score for e in evaluations] | |
mean_scores.append((np.mean(val_scores), sem(val_scores), | |
np.mean(train_scores), sem(train_scores), | |
params)) | |
return sorted(mean_scores, reverse=True)[:n_top] | |
def report(self, n_top=5): | |
bests = self.find_bests(n_top=n_top) | |
output = "Progress: {0:02d}% ({1:03d}/{2:03d})\n".format( | |
int(100 * self.progress()), self.completed(), self.total()) | |
for i, best in enumerate(bests): | |
output += ("\nRank {0}: validation: {1:.5f} (+/-{2:.5f})" | |
" train: {3:.5f} (+/-{4:.5f}):\n {5}".format( | |
i + 1, *best)) | |
return output | |
def __repr__(self): | |
return self.report() | |
def boxplot_parameters(self, display_train=False): | |
"""Plot boxplot for each parameters independently""" | |
import pylab as pl | |
results = [Evaluation(*task.get()) | |
for task_group in self.task_groups | |
for task in task_group | |
if task.ready() and not is_aborted(task)] | |
n_rows = len(self.parameter_grid) | |
pl.figure() | |
for i, (param_name, param_values) in enumerate(self.parameter_grid.items()): | |
pl.subplot(n_rows, 1, i + 1) | |
val_scores_per_value = [] | |
train_scores_per_value = [] | |
for param_value in param_values: | |
train_scores = [r.train_score for r in results | |
if r.parameters[param_name] == param_value] | |
train_scores_per_value.append(train_scores) | |
val_scores = [r.validation_score for r in results | |
if r.parameters[param_name] == param_value] | |
val_scores_per_value.append(val_scores) | |
widths = 0.25 | |
positions = np.arange(len(param_values)) + 1 | |
offset = 0 | |
if display_train: | |
offset = 0.175 | |
pl.boxplot(train_scores_per_value, widths=widths, | |
positions=positions - offset) | |
pl.boxplot(val_scores_per_value, widths=widths, | |
positions=positions + offset) | |
pl.xticks(np.arange(len(param_values)) + 1, param_values) | |
pl.xlabel(param_name) | |
pl.ylabel("Val. Score") |