Slides of the presentation:
https://speakerdeck.com/ogrisel/parallel-and-large-scale-machine-learning-with-scikit-learn
| *.pyc | |
| *.npy | |
| *.pkl | |
| *.mmap |
Slides of the presentation:
https://speakerdeck.com/ogrisel/parallel-and-large-scale-machine-learning-with-scikit-learn
| import os | |
| from IPython.parallel import interactive | |
| @interactive | |
| def persist_cv_splits(name, X, y, n_cv_iter=5, suffix="_cv_%03d.pkl", | |
| test_size=0.25, random_state=None): | |
| """Materialize randomized train test splits of a dataset.""" | |
| from sklearn.externals import joblib | |
| from sklearn.cross_validation import ShuffleSplit | |
| import os | |
| cv = ShuffleSplit(X.shape[0], n_iter=n_cv_iter, | |
| test_size=test_size, random_state=random_state) | |
| cv_split_filenames = [] | |
| for i, (train, test) in enumerate(cv): | |
| cv_fold = (X[train], y[train], X[test], y[test]) | |
| cv_split_filename = os.path.abspath(name + suffix % i) | |
| joblib.dump(cv_fold, cv_split_filename) | |
| cv_split_filenames.append(cv_split_filename) | |
| return cv_split_filenames | |
| import os | |
| def warm_mmap_on_cv_splits(client, cv_split_filenames): | |
| """Trigger a disk load on all the arrays of the CV splits | |
| Assume the files are shared on all the hosts using NFS. | |
| """ | |
| # First step: query cluster to fetch one engine id per host | |
| all_engines = client[:] | |
| @interactive | |
| def hostname(): | |
| import socket | |
| return socket.gethostname() | |
| hostnames = all_engines.apply(hostname).get_dict() | |
| one_engine_per_host = dict((hostname, engine_id) | |
| for engine_id, hostname | |
| in hostnames.items()) | |
| hosts_view = client[one_engine_per_host.values()] | |
| # Second step: for each data file and host, mmap the arrays of the file | |
| # and trigger a sequential read of all the arrays' data | |
| @interactive | |
| def load_in_memory(filenames): | |
| from sklearn.externals import joblib | |
| for filename in filenames: | |
| arrays = joblib.load(filename, mmap_mode='r') | |
| for array in arrays: | |
| array.sum() # trigger the disk read | |
| cv_split_filenames = [os.path.abspath(f) for f in cv_split_filenames] | |
| hosts_view.apply_sync(load_in_memory, cv_split_filenames) |
| from collections import namedtuple | |
| from collections import defaultdict | |
| from IPython.parallel import interactive | |
| from IPython.parallel import TaskAborted | |
| from scipy.stats import sem | |
| import numpy as np | |
| from sklearn.utils import check_random_state | |
| try: | |
| # sklearn 0.14+ | |
| from sklearn.grid_search import ParameterGrid | |
| except ImportError: | |
| # sklearn 0.13 | |
| from sklearn.grid_search import IterGrid as ParameterGrid | |
| from mmap_utils import warm_mmap_on_cv_splits | |
| def is_aborted(task): | |
| return isinstance(getattr(task, '_exception', None), TaskAborted) | |
| @interactive | |
| def compute_evaluation(model, cv_split_filename, params=None, | |
| train_fraction=1.0, mmap_mode='r'): | |
| """Function executed on a worker to evaluate a model on a given CV split""" | |
| # All module imports should be executed in the worker namespace | |
| from time import time | |
| from sklearn.externals import joblib | |
| X_train, y_train, X_test, y_test = joblib.load( | |
| cv_split_filename, mmap_mode=mmap_mode) | |
| # Slice a subset of the training set for plotting learning curves | |
| n_samples_train = int(train_fraction * X_train.shape[0]) | |
| X_train = X_train[:n_samples_train] | |
| y_train = y_train[:n_samples_train] | |
| # Configure the model | |
| if model is not None: | |
| model.set_params(**params) | |
| # Fit model and measure training time | |
| t0 = time() | |
| model.fit(X_train, y_train) | |
| train_time = time() - t0 | |
| # Compute score on training set | |
| train_score = model.score(X_train, y_train) | |
| # Compute score on test set | |
| test_score = model.score(X_test, y_test) | |
| # Wrap evaluation results in a simple tuple datastructure | |
| return (test_score, train_score, train_time, | |
| train_fraction, params) | |
| # Named tuple to collect evaluation results | |
| Evaluation = namedtuple('Evaluation', ( | |
| 'validation_score', | |
| 'train_score', | |
| 'train_time', | |
| 'train_fraction', | |
| 'parameters')) | |
| class RandomizedGridSeach(object): | |
| """"Async Randomized Parameter search.""" | |
| def __init__(self, load_balanced_view, random_state=0): | |
| self.task_groups = [] | |
| self.lb_view = load_balanced_view | |
| self.random_state = random_state | |
| def map_tasks(self, f): | |
| return [f(task) for task_group in self.task_groups | |
| for task in task_group] | |
| def abort(self): | |
| for task_group in self.task_groups: | |
| for task in task_group: | |
| if not task.ready(): | |
| try: | |
| task.abort() | |
| except AssertionError: | |
| pass | |
| return self | |
| def wait(self): | |
| self.map_tasks(lambda t: t.wait()) | |
| return self | |
| def completed(self): | |
| return sum(self.map_tasks(lambda t: t.ready() and not is_aborted(t))) | |
| def total(self): | |
| return sum(self.map_tasks(lambda t: 1)) | |
| def progress(self): | |
| c = self.completed() | |
| if c == 0: | |
| return 0.0 | |
| else: | |
| return float(c) / self.total() | |
| def reset(self): | |
| # Abort any other previously scheduled tasks | |
| self.map_tasks(lambda t: t.abort()) | |
| # Schedule a new batch of evalutation tasks | |
| self.task_groups, self.all_parameters = [], [] | |
| def launch_for_splits(self, model, parameter_grid, cv_split_filenames, | |
| pre_warm=True): | |
| """Launch a Grid Search on precomputed CV splits.""" | |
| # Abort any existing processing and erase previous state | |
| self.reset() | |
| self.parameter_grid = parameter_grid | |
| # Warm the OS disk cache on each host with sequential reads | |
| # XXX: fix me: interactive namespace issues to resolve | |
| # if pre_warm: | |
| # warm_mmap_on_cv_splits(self.lb_view.client, cv_split_filenames) | |
| # Randomize the grid order | |
| random_state = check_random_state(self.random_state) | |
| self.all_parameters = list(ParameterGrid(parameter_grid)) | |
| random_state.shuffle(self.all_parameters) | |
| for params in self.all_parameters: | |
| task_group = [] | |
| for cv_split_filename in cv_split_filenames: | |
| task = self.lb_view.apply(compute_evaluation, | |
| model, cv_split_filename, params=params) | |
| task_group.append(task) | |
| self.task_groups.append(task_group) | |
| # Make it possible to chain method calls | |
| return self | |
| def find_bests(self, n_top=5): | |
| """Compute the mean score of the completed tasks""" | |
| mean_scores = [] | |
| for params, task_group in zip(self.all_parameters, self.task_groups): | |
| evaluations = [Evaluation(*t.get()) | |
| for t in task_group | |
| if t.ready() and not is_aborted(t)] | |
| if len(evaluations) == 0: | |
| continue | |
| val_scores = [e.validation_score for e in evaluations] | |
| train_scores = [e.train_score for e in evaluations] | |
| mean_scores.append((np.mean(val_scores), sem(val_scores), | |
| np.mean(train_scores), sem(train_scores), | |
| params)) | |
| return sorted(mean_scores, reverse=True)[:n_top] | |
| def report(self, n_top=5): | |
| bests = self.find_bests(n_top=n_top) | |
| output = "Progress: {0:02d}% ({1:03d}/{2:03d})\n".format( | |
| int(100 * self.progress()), self.completed(), self.total()) | |
| for i, best in enumerate(bests): | |
| output += ("\nRank {0}: validation: {1:.5f} (+/-{2:.5f})" | |
| " train: {3:.5f} (+/-{4:.5f}):\n {5}".format( | |
| i + 1, *best)) | |
| return output | |
| def __repr__(self): | |
| return self.report() | |
| def boxplot_parameters(self, display_train=False): | |
| """Plot boxplot for each parameters independently""" | |
| import pylab as pl | |
| results = [Evaluation(*task.get()) | |
| for task_group in self.task_groups | |
| for task in task_group | |
| if task.ready() and not is_aborted(task)] | |
| n_rows = len(self.parameter_grid) | |
| pl.figure() | |
| for i, (param_name, param_values) in enumerate(self.parameter_grid.items()): | |
| pl.subplot(n_rows, 1, i + 1) | |
| val_scores_per_value = [] | |
| train_scores_per_value = [] | |
| for param_value in param_values: | |
| train_scores = [r.train_score for r in results | |
| if r.parameters[param_name] == param_value] | |
| train_scores_per_value.append(train_scores) | |
| val_scores = [r.validation_score for r in results | |
| if r.parameters[param_name] == param_value] | |
| val_scores_per_value.append(val_scores) | |
| widths = 0.25 | |
| positions = np.arange(len(param_values)) + 1 | |
| offset = 0 | |
| if display_train: | |
| offset = 0.175 | |
| pl.boxplot(train_scores_per_value, widths=widths, | |
| positions=positions - offset) | |
| pl.boxplot(val_scores_per_value, widths=widths, | |
| positions=positions + offset) | |
| pl.xticks(np.arange(len(param_values)) + 1, param_values) | |
| pl.xlabel(param_name) | |
| pl.ylabel("Val. Score") |