Last active
July 20, 2021 23:30
-
-
Save hsteinshiromoto/4ba05756a9ad887be70288984f013d8c to your computer and use it in GitHub Desktop.
bayesiansearchcv.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import traceback | |
from collections.abc import Iterable | |
from typing import Union | |
from hyperopt import STATUS_FAIL, STATUS_OK, Trials, fmin, hp, tpe | |
import pandas as pd | |
import numpy as np | |
import scipy as sp | |
from sklearn.model_selection import KFold, StratifiedKFold | |
from sklearn.metrics import get_scorer | |
class BayesSearchCV: | |
def __init__(self, estimator, param_distributions: dict, scoring: dict | |
,n_iter: int=10, weights_matrix: np.ndarray=None, cv: Union[int, Iterable]=5, random_state: int=None | |
,algo=tpe.suggest, trials: Trials=Trials()) -> None: | |
"""Use Bayesian optimisation to search for hyperpameters and selects best estimator based on validation sets | |
Args: | |
estimator: Estimator object | |
param_distributions (dict): Search space containing hyperparameters | |
scoring (dict): {metric: opt_value} Dict of performance metrics to measure the estimator performance and their corresponding optimal values. | |
Select one from sklearn.metrics.SCORERS.keys() | |
n_iter (int, optional): Max number of iterations. Defaults to 10. | |
weights_matrix (np.ndarray, optional): Symmetric positive definite matrix used to calculate the quadratic loss function | |
cv (int or Iterable, optional): int, cross-validation generator or an iterable. Defaults to 5. | |
random_state (int, optional): Pseudo random number generator state used for random uniform sampling. Defaults to None. | |
algo (optional): Algorithm to for distribution search. Defaults to tpe.suggest. | |
trials (Trials, optional): [description]. Defaults to Trials(). | |
""" | |
self.estimator = estimator | |
self.param_distributions = param_distributions | |
self.n_iter = n_iter | |
self.random_state = random_state | |
self.weights_matrix = weights_matrix or np.identity(len(scoring)) | |
self.cv = cv | |
self.algo = algo | |
self.trials = trials | |
self.scoring = scoring | |
def fit(self, X: pd.DataFrame, y=None) -> None: | |
"""Find optimal hyperparameters and fit estimator | |
Args: | |
X (pd.DataFrame): Predictors | |
y (pd.DataFrame): Target | |
""" | |
self.cv_results_ = pd.DataFrame() | |
self.min_loss = np.inf | |
if not self._check_spd(self.weights_matrix): | |
msg = f"Expected weights matrix to be symmetric positive definite." | |
raise ValueError(msg) | |
for iteration, (train_index, val_index) in enumerate(self._get_splits(X, y)): | |
X_train, X_val = X[train_index], X[val_index] | |
y_train, y_val = y[train_index], y[val_index] | |
objective = lambda space: self._cost(X=X_train, y=y_train, hyperparameters=space) | |
try: | |
hyperparameters = fmin(fn=objective, space=self.param_distributions | |
,algo=self.algo, max_evals=self.n_iter | |
,trials=self.trials) | |
except KeyError: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return {'status': STATUS_FAIL, | |
'exception': str(sys.exc_info())} | |
estimator = self._instantiate_estimator(X_train, y_train, hyperarameters=hyperparameters) | |
loss_df, current_loss = self._cost(X_val, y_val, hyperparameters, estimator=estimator, return_loss_df=True) | |
loss_df["cv_iteration"] = iteration | |
if current_loss < self.min_loss: | |
self.min_loss = current_loss | |
self.best_estimator_ = estimator | |
self.best_hyperparameters_ = hyperparameters | |
self.cv_results_ = pd.concat([self.cv_results_ , loss_df.copy()]) | |
self.cv_results_.rename(columns={col: f"{col}_loss" for col in self.cv_results_.columns if col != "loss"}, inplace=True) | |
self.cv_results_.sort_values(by="loss", inplace=True) | |
self.cv_results_.reset_index(inplace=True, drop=True) | |
def _get_splits(self, X: pd.DataFrame, y=None): | |
"""Instantiate and/or get training and validation datasets | |
Args: | |
X (pd.DataFrame): Predictor | |
y (pd.DataFrame): Target | |
Yields: | |
[type]: Train and test indices | |
Raises: | |
NotImplementedError: Only KFold and StratifiedKFold are implemented | |
""" | |
if isinstance(self.cv, int): | |
self.cv = KFold(n_splits=self.cv, random_state=self.random_state) | |
elif isinstance(self.cv, StratifiedKFold): | |
pass | |
else: | |
msg = f"Cross validation not yet implemented for type {type(self.cv)}" | |
NotImplementedError(msg) | |
for train_index, test_index in self.cv.split(X, y): | |
yield train_index, test_index | |
def _cost(self, X: pd.DataFrame, y: pd.DataFrame, hyperparameters: dict | |
,estimator=None, return_loss_df: bool=False) -> dict: | |
"""Evaluates the cost function for the trained estimator using a quadratic loss function | |
Args: | |
X (pd.DataFrame): Predictor | |
y (pd.DataFrame): Target | |
hyperarameters (dict): Estimator hyperparameters | |
return_loss_df (bool): Returns fit loss data frame | |
Returns: | |
(dict) | |
""" | |
loss_dict = {metric_name: [] for metric_name in self.scoring} | |
if not estimator: | |
estimator = self._instantiate_estimator(X, y, hyperparameters) | |
for p_metric, opt_value in self.scoring.items(): | |
scorer = get_scorer(p_metric) | |
loss_dict[p_metric].append((opt_value - scorer(estimator, X, y))**2) | |
loss_df = pd.DataFrame.from_dict(loss_dict) | |
loss = loss_df.values.dot(self.weights_matrix.dot(loss_df.T.values)) | |
loss_df["loss"] = loss | |
if return_loss_df: | |
return loss_df, loss | |
return {'loss': np.sqrt(loss), 'status': STATUS_OK} | |
def _instantiate_estimator(self, X: pd.DataFrame, y: pd.DataFrame | |
,hyperarameters: dict): | |
"""Instantiate estimator with selected hyperparameters | |
Args: | |
X (pd.DataFrame): Predictors | |
y (pd.DataFrame): Target | |
hyperarameters (dict): Estimator hyperparameters | |
Returns: | |
[type]: Estimator | |
""" | |
estimator_cls = self.estimator.__class__ | |
estimator = estimator_cls(**hyperarameters) | |
estimator.fit(X, y) | |
return estimator | |
@staticmethod | |
def _check_spd(m: np.ndarray, rtol: float=1e-6, atol: float=1e-9) -> bool: | |
"""Checks if a matrix is symmetric positive definite | |
Args: | |
m (np.ndarray): Matrix | |
rtol (float): Relative tolerance to verify if m is symmetric | |
atol (float): Absolute tolerance to verify if m is symmetric | |
Returns: | |
(bool): True if matrix is symmetric positive definite | |
""" | |
try: | |
# Check if matrix is positive definite | |
np.linalg.cholesky(m) | |
except np.linalg.linalg.LinAlgError as err: | |
if 'Matrix is not positive definite' in err.message: | |
return False | |
else: | |
raise | |
else: | |
# Now that m is positive definite, check if it is symmetric | |
return np.allclose(m, m.T, rtol=rtol, atol=atol) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment