Skip to content

Instantly share code, notes, and snippets.

@hermidalc
Last active September 3, 2024 11:48
Show Gist options
  • Select an option

  • Save hermidalc/9413226db65be6943dba31e2a3242633 to your computer and use it in GitHub Desktop.

Select an option

Save hermidalc/9413226db65be6943dba31e2a3242633 to your computer and use it in GitHub Desktop.
Building and evaluating ML models of RNA-seq count data using nested CV
import atexit
import os
import re
import sys
from argparse import ArgumentParser
from decimal import Decimal
from glob import glob
from pprint import pprint
from shutil import rmtree
from tempfile import mkdtemp, gettempdir
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from pandas.api.types import is_integer_dtype, is_float_dtype
import rpy2.robjects as robjects
from eli5 import explain_weights_df
from joblib import Memory, dump, parallel_backend
from rpy2.robjects import numpy2ri, pandas2ri
from rpy2.robjects.packages import importr
from sklearn.base import BaseEstimator
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (auc, average_precision_score,
balanced_accuracy_score, precision_recall_curve,
roc_auc_score, roc_curve)
from sklearn.model_selection import GridSearchCV, RepeatedStratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from tabulate import tabulate
numpy2ri.activate()
pandas2ri.activate()
from sklearn_edger import EdgeRFilterByExpr, EdgeRTMMLogCPM
def get_param_type(param):
pipe_step_type_regex = re.compile(
r'^({})\d+$'.format('|'.join(pipeline_step_types)))
param_parts = param.split('__')
param_parts_start_idx = [i for i, p in enumerate(param_parts)
if pipe_step_type_regex.match(p)][-1]
param_parts[param_parts_start_idx] = pipe_step_type_regex.sub(
r'\1', param_parts[param_parts_start_idx])
param_type = '__'.join(param_parts[param_parts_start_idx:])
return param_type
def calculate_test_scores(pipe, X_test, y_test, test_sample_weights=None):
scores = {}
if hasattr(pipe, 'decision_function'):
y_score = pipe.decision_function(X_test)
else:
y_score = pipe.predict_proba(X_test)[:, 1]
scores['y_score'] = y_score
for metric in args.scv_scoring:
if metric == 'roc_auc':
scores[metric] = roc_auc_score(
y_test, y_score, sample_weight=test_sample_weights)
scores['fpr'], scores['tpr'], _ = roc_curve(
y_test, y_score, pos_label=1,
sample_weight=test_sample_weights)
elif metric == 'balanced_accuracy':
y_pred = pipe.predict(X_test)
scores['y_pred'] = y_pred
scores[metric] = balanced_accuracy_score(
y_test, y_pred, sample_weight=test_sample_weights)
elif metric == 'average_precision':
scores[metric] = average_precision_score(
y_test, y_score, sample_weight=test_sample_weights)
scores['pre'], scores['rec'], _ = precision_recall_curve(
y_test, y_score, pos_label=1,
sample_weight=test_sample_weights)
scores['pr_auc'] = auc(scores['rec'], scores['pre'])
return scores
def transform_feature_meta(estimator, feature_meta):
if hasattr(estimator, 'get_support'):
transformed_feature_meta = feature_meta.loc[estimator.get_support()]
elif hasattr(estimator, 'get_feature_names'):
transformed_feature_meta = None
feature_names = estimator.get_feature_names(
input_features=feature_meta.index.values).astype(str)
for feature_name in feature_meta.index:
f_feature_meta = pd.concat(
[feature_meta.loc[[feature_name]]] * np.sum(
np.char.startswith(feature_names,
'{}_'.format(feature_name))),
axis=0, ignore_index=True)
if transformed_feature_meta is None:
transformed_feature_meta = f_feature_meta
else:
transformed_feature_meta = pd.concat(
[transformed_feature_meta, f_feature_meta],
axis=0, ignore_index=True)
transformed_feature_meta = (transformed_feature_meta
.set_index(feature_names))
else:
transformed_feature_meta = feature_meta
return transformed_feature_meta
def get_final_feature_meta(pipe, feature_meta):
for estimator in pipe:
feature_meta = transform_feature_meta(estimator, feature_meta)
final_estimator = pipe[-1]
feature_weights = explain_weights_df(
final_estimator, feature_names=feature_meta.index.values)
if feature_weights is None and hasattr(final_estimator, 'estimator_'):
feature_weights = explain_weights_df(
final_estimator.estimator_,
feature_names=feature_meta.index.values)
if feature_weights is not None:
feature_weights.set_index('feature', inplace=True,
verify_integrity=True)
feature_weights.columns = map(str.title, feature_weights.columns)
feature_meta = feature_meta.join(feature_weights, how='inner')
if (feature_meta['Weight'] == 0).any():
feature_meta = feature_meta.loc[feature_meta['Weight'] != 0]
feature_meta.index.rename('Feature', inplace=True)
return feature_meta
def add_param_cv_scores(search, param_grid_dict, param_cv_scores=None):
if param_cv_scores is None:
param_cv_scores = {}
for param, param_values in param_grid_dict.items():
if len(param_values) == 1:
continue
param_cv_values = search.cv_results_['param_{}'.format(param)]
if any(isinstance(v, BaseEstimator) for v in param_cv_values):
param_cv_values = np.array(
['.'.join([type(v).__module__, type(v).__qualname__])
if isinstance(v, BaseEstimator) else v
for v in param_cv_values])
if param not in param_cv_scores:
param_cv_scores[param] = {}
for metric in args.scv_scoring:
if metric not in param_cv_scores[param]:
param_cv_scores[param][metric] = {'scores': [], 'stdev': []}
param_metric_scores = param_cv_scores[param][metric]['scores']
param_metric_stdev = param_cv_scores[param][metric]['stdev']
for param_value_idx, param_value in enumerate(param_values):
mean_cv_scores = (search.cv_results_
['mean_test_{}'.format(metric)]
[param_cv_values == param_value])
std_cv_scores = (search.cv_results_
['std_test_{}'.format(metric)]
[param_cv_values == param_value])
if mean_cv_scores.size > 0:
if param_value_idx < len(param_metric_scores):
param_metric_scores[param_value_idx] = np.append(
param_metric_scores[param_value_idx],
mean_cv_scores[np.argmax(mean_cv_scores)])
param_metric_stdev[param_value_idx] = np.append(
param_metric_stdev[param_value_idx],
std_cv_scores[np.argmax(mean_cv_scores)])
else:
param_metric_scores.append(np.array(
[mean_cv_scores[np.argmax(mean_cv_scores)]]))
param_metric_stdev.append(np.array(
[std_cv_scores[np.argmax(mean_cv_scores)]]))
elif param_value_idx < len(param_metric_scores):
param_metric_scores[param_value_idx] = np.append(
param_metric_scores[param_value_idx], [np.nan])
param_metric_stdev[param_value_idx] = np.append(
param_metric_stdev[param_value_idx], [np.nan])
else:
param_metric_scores.append(np.array([np.nan]))
param_metric_stdev.append(np.array([np.nan]))
return param_cv_scores
def plot_param_cv_metrics(model_name, param_grid_dict, param_cv_scores):
cv_metric_colors = sns.color_palette('hls', len(args.scv_scoring))
for param in param_cv_scores:
mean_cv_scores, std_cv_scores = {}, {}
for metric in args.scv_scoring:
param_metric_scores = param_cv_scores[param][metric]['scores']
param_metric_stdev = param_cv_scores[param][metric]['stdev']
if any(len(scores) > 1 for scores in param_metric_scores):
mean_cv_scores[metric], std_cv_scores[metric] = [], []
for param_value_scores in param_metric_scores:
mean_cv_scores[metric].append(np.mean(param_value_scores))
std_cv_scores[metric].append(np.std(param_value_scores))
else:
mean_cv_scores[metric] = np.ravel(param_metric_scores)
std_cv_scores[metric] = np.ravel(param_metric_stdev)
plt.figure(figsize=(args.fig_width, args.fig_height))
param_type = get_param_type(param)
if param_type in params_lin_xticks:
x_axis = param_grid_dict[param]
if all(0 <= x <= 1 for x in x_axis):
if len(x_axis) <= 15:
plt.xticks(x_axis)
elif len(x_axis) <= 30:
plt.xticks(x_axis)
elif param_type in params_log_xticks:
x_axis = param_grid_dict[param]
plt.xscale('log')
elif param_type in params_fixed_xticks:
x_axis = range(len(param_grid_dict[param]))
xtick_labels = [v.split('.')[-1]
if param_type in pipeline_step_types
and not args.long_label_names
and v is not None else str(v)
for v in param_grid_dict[param]]
plt.xticks(x_axis, xtick_labels)
else:
raise RuntimeError('No ticks config exists for {}'
.format(param_type))
plt.xlim([min(x_axis), max(x_axis)])
plt.suptitle('Effect of {} on CV Performance Metrics'.format(param),
fontsize=args.title_font_size)
plt.title('{}'.format(model_name), fontsize=args.title_font_size - 1)
plt.xlabel(param, fontsize=args.axis_font_size)
plt.ylabel('CV Score', fontsize=args.axis_font_size)
for metric_idx, metric in enumerate(args.scv_scoring):
plt.plot(x_axis, mean_cv_scores[metric],
color=cv_metric_colors[metric_idx], lw=2, alpha=0.8,
label='Mean {}'.format(metric_label[metric]))
plt.fill_between(x_axis,
[m - s for m, s in zip(mean_cv_scores[metric],
std_cv_scores[metric])],
[m + s for m, s in zip(mean_cv_scores[metric],
std_cv_scores[metric])],
alpha=0.1, color=cv_metric_colors[metric_idx],
label=(r'$\pm$ 1 std. dev.'
if metric_idx == len(args.scv_scoring) - 1
else None))
plt.legend(loc='lower right', fontsize='medium')
plt.tick_params(labelsize=args.axis_font_size)
plt.grid(True, alpha=0.3)
def unset_pipe_memory(pipe):
for param, param_value in pipe.get_params(deep=True).items():
if isinstance(param_value, Memory):
pipe.set_params(**{param: None})
return pipe
def run_cleanup():
rmtree(cachedir)
if args.parallel_backend == 'loky':
for rtmp_dir in glob('{}/Rtmp*/'.format(args.tmp_dir)):
rmtree(rtmp_dir)
def dir_path(path):
os.makedirs(path, mode=0o755, exist_ok=True)
return path
parser = ArgumentParser()
parser.add_argument('--dataset', type=str, required=True, help='dataset')
parser.add_argument('--scv-splits', type=int, required=True,
help='num inner cv splits')
parser.add_argument('--scv-repeats', type=int, required=True,
help='num inner cv repeats')
parser.add_argument('--test-splits', type=int, required=True,
help='num outer test splits')
parser.add_argument('--test-repeats', type=int, required=True,
help='num outer test repeats')
parser.add_argument('--scv-verbose', type=int, default=1, help='scv verbosity')
parser.add_argument('--scv-scoring', type=str, nargs='+',
choices=['roc_auc', 'balanced_accuracy',
'average_precision'],
default=['roc_auc', 'balanced_accuracy',
'average_precision'],
help='scv scoring metric')
parser.add_argument('--scv-refit', type=str,
choices=['roc_auc', 'balanced_accuracy',
'average_precision'],
default='roc_auc',
help='scv refit scoring metric')
parser.add_argument('--scv-error-score', type=str, default='nan',
help='scv error score')
parser.add_argument('--n-jobs', type=int, default=-2, help='num parallel jobs')
parser.add_argument('--parallel-backend', type=str, default='loky',
help='joblib parallel backend')
parser.add_argument('--results-dir', type=dir_path, default='results',
help='results dir')
parser.add_argument('--tmp-dir', type=dir_path, default=gettempdir(),
help='tmp dir')
parser.add_argument('--verbose', type=int, default=1, help='program verbosity')
parser.add_argument('--random-seed', type=int, default=777,
help='random state seed')
parser.add_argument('--model-code', type=str, default='lgr', help='model code')
parser.add_argument('--title-font-size', type=int, default=14,
help='figure title font size')
parser.add_argument('--axis-font-size', type=int, default=14,
help='figure axis font size')
parser.add_argument('--long-label-names', default=False, action='store_true',
help='figure long label names')
parser.add_argument('--fig-width', type=float, default=10,
help='figure width')
parser.add_argument('--fig-height', type=float, default=10,
help='figure height')
parser.add_argument('--fig-format', type=str, nargs='+',
choices=['png', 'pdf', 'svg', 'tif'], default=['png'],
help='figure format')
args = parser.parse_args()
pipeline_step_types = ('slr', 'trf', 'clf', 'rgr')
metric_label = {'roc_auc': 'ROC AUC',
'balanced_accuracy': 'BCR',
'average_precision': 'AVG PRE'}
params_lin_xticks = [
'slr__k',
'slr__max_features',
'slr__estimator__l1_ratio',
'slr__estimator__n_estimators',
'slr__n_neighbors',
'slr__sample_size',
'clf__n_features_to_select',
'clf__estimator__n_estimators',
'clf__degree',
'clf__l1_ratio',
'clf__n_neighbors',
'clf__n_estimators']
params_log_xticks = [
'slr__estimator__C',
'slr__estimator__learning_rate',
'clf__alpha',
'clf__C',
'clf__learning_rate',
'clf__estimator__C',
'clf__estimator__learning_rate',
'clf__base_estimator__C']
params_fixed_xticks = [
'slr',
'slr__estimator__class_weight',
'slr__estimator__max_depth',
'slr__estimator__max_features',
'slr__threshold',
'trf',
'clf',
'clf__class_weight',
'clf__kernel',
'clf__loss',
'clf__gamma',
'clf__weights',
'clf__max_depth',
'clf__estimator__class_weight',
'clf__estimator__max_depth',
'clf__estimator__max_features',
'clf__base_estimator__class_weight',
'clf__max_features']
params_k_selected_features = [
'slr__k',
'slr__max_features',
'clf__n_features_to_select']
# suppress linux conda qt5 wayland warning
if sys.platform.startswith('linux'):
os.environ['XDG_SESSION_TYPE'] = 'x11'
r_base = importr('base')
r_biobase = importr('Biobase')
robjects.r('set.seed({:d})'.format(args.random_seed))
atexit.register(run_cleanup)
cachedir = mkdtemp(dir=args.tmp_dir)
memory = Memory(location=cachedir, verbose=0)
if args.scv_error_score.isdigit():
args.scv_error_score = int(args.scv_error_score)
elif args.scv_error_score == 'nan':
args.scv_error_score = np.nan
python_warnings = ([os.environ['PYTHONWARNINGS']]
if 'PYTHONWARNINGS' in os.environ else [])
python_warnings.append(':'.join(
['ignore', 'Solver terminated early', 'UserWarning',
'sklearn.svm._base']))
python_warnings.append(':'.join(
['ignore', ('The max_iter was reached which means the coef_ did not '
'converge'), 'UserWarning', 'sklearn.linear_model._sag']))
python_warnings.append(':'.join(
['ignore', 'Persisting input arguments took', 'UserWarning']))
os.environ['PYTHONWARNINGS'] = ','.join(python_warnings)
cv_splitter = RepeatedStratifiedKFold(
n_splits=args.scv_splits, n_repeats=args.scv_repeats,
random_state=args.random_seed)
pipe = Pipeline(
memory=memory,
steps=[('slr0', EdgeRFilterByExpr(is_classif=True)),
('trf1', EdgeRTMMLogCPM(prior_count=1)),
('trf2', StandardScaler()),
('clf3', LogisticRegression(
class_weight='balanced', max_iter=5000, penalty='elasticnet',
random_state=args.random_seed, solver='saga'))])
param_grid = {
'clf3__C': np.logspace(-3, 1, 5),
'clf3__l1_ratio': np.array([0.1, 0.3, 0.5, 0.7, 0.8, 0.9, 0.95, 0.99, 1.])}
search = GridSearchCV(
pipe, cv=cv_splitter, error_score=args.scv_error_score, n_jobs=args.n_jobs,
param_grid=param_grid, refit=args.scv_refit, return_train_score=False,
scoring=args.scv_scoring, verbose=args.scv_verbose)
dataset_name, file_extension = os.path.splitext(os.path.split(args.dataset)[1])
if not os.path.isfile(args.dataset) or file_extension not in (
'.Rda', '.rda', '.RData', '.Rdata', '.Rds', '.rds'):
raise IOError('File does not exist/invalid: {}'.format(args.dataset))
if file_extension in ('.Rda', '.rda', '.RData', '.Rdata'):
r_base.load(args.dataset)
eset = robjects.globalenv[dataset_name]
else:
eset = r_base.readRDS(args.dataset)
X = pd.DataFrame(r_base.t(r_biobase.exprs(eset)),
columns=r_biobase.featureNames(eset),
index=r_biobase.sampleNames(eset))
sample_meta = r_biobase.pData(eset)
y = np.array(sample_meta['Class'], dtype=int)
try:
feature_meta = r_biobase.fData(eset)
feature_meta_category_cols = (
feature_meta.select_dtypes(include='category').columns)
feature_meta[feature_meta_category_cols] = (
feature_meta[feature_meta_category_cols].astype(str))
except ValueError:
feature_meta = pd.DataFrame(index=r_biobase.featureNames(eset))
if dataset_name.split('_')[-1] == 'eset':
model_name = '_'.join([dataset_name.rpartition('_')[0], args.model_code])
else:
model_name = '_'.join([dataset_name, args.model_code])
results_dir = '{}/{}'.format(args.results_dir, model_name)
if args.verbose > 0:
print(search.__repr__(N_CHAR_MAX=10000))
if args.verbose > 0 or args.scv_verbose > 0:
print('Dataset:', dataset_name, X.shape)
test_splitter = RepeatedStratifiedKFold(
n_splits=args.test_splits, n_repeats=args.test_repeats,
random_state=args.random_seed)
if args.verbose > 0:
print('Test CV:', end=' ')
pprint(test_splitter)
split_models = []
split_results = []
param_cv_scores = {}
for split_idx, (train_idxs, test_idxs) in enumerate(test_splitter.split(X, y)):
with parallel_backend(args.parallel_backend, inner_max_num_threads=1):
search.fit(X.iloc[train_idxs], y[train_idxs])
best_index = search.best_index_
best_params = search.best_params_
best_pipe = search.best_estimator_
split_scores = {'cv': {}}
for metric in args.scv_scoring:
split_scores['cv'][metric] = search.cv_results_[
'mean_test_{}'.format(metric)][best_index]
split_scores['te'] = calculate_test_scores(best_pipe, X.iloc[test_idxs],
y[test_idxs])
param_cv_scores = add_param_cv_scores(search, param_grid, param_cv_scores)
final_feature_meta = get_final_feature_meta(best_pipe, feature_meta)
if args.verbose > 0:
print('Model:', model_name, ' Split: {:>{width}d}'.format(
split_idx + 1, width=len(str(args.test_splits))), end=' ')
for metric in args.scv_scoring:
print(' {} (CV / Test): {:.4f} / {:.4f}'.format(
metric_label[metric], split_scores['cv'][metric],
split_scores['te'][metric]), end=' ')
if metric == 'average_precision':
print(' PR AUC Test: {:.4f}'.format(
split_scores['te']['pr_auc']), end=' ')
print(' Params:', {k: ('.'.join([type(v).__module__,
type(v).__qualname__])
if isinstance(v, BaseEstimator) else v)
for k, v in best_params.items()})
num_features = final_feature_meta.shape[0]
print(' Features: {:.0f}'.format(num_features))
if args.verbose > 1:
if 'Weight' in final_feature_meta.columns:
print(tabulate(final_feature_meta.iloc[
(-final_feature_meta['Weight'].abs()).argsort()],
floatfmt='.6e', headers='keys'))
else:
print(tabulate(final_feature_meta, headers='keys'))
split_result = {'feature_meta': final_feature_meta,
'scores': split_scores}
split_results.append(split_result)
best_pipe = unset_pipe_memory(best_pipe)
split_models.append(best_pipe)
memory.clear(warn=False)
os.makedirs(results_dir, mode=0o755, exist_ok=True)
dump(split_models, '{}/{}_split_models.pkl'
.format(results_dir, model_name))
dump(split_results, '{}/{}_split_results.pkl'
.format(results_dir, model_name))
dump(param_cv_scores, '{}/{}_param_cv_scores.pkl'
.format(results_dir, model_name))
scores = {'cv': {}, 'te': {}}
num_features = []
for split_result in split_results:
if split_result is None:
continue
for metric in args.scv_scoring:
if metric not in scores['cv']:
scores['cv'][metric] = []
scores['te'][metric] = []
scores['cv'][metric].append(
split_result['scores']['cv'][metric])
scores['te'][metric].append(
split_result['scores']['te'][metric])
if metric == 'average_precision':
if 'pr_auc' not in scores['te']:
scores['te']['pr_auc'] = []
scores['te']['pr_auc'].append(
split_result['scores']['te']['pr_auc'])
split_feature_meta = split_result['feature_meta']
num_features.append(split_feature_meta.shape[0])
print('Model:', model_name, end=' ')
for metric in args.scv_scoring:
print(' Mean {} (CV / Test): {:.4f} / {:.4f}'.format(
metric_label[metric], np.mean(scores['cv'][metric]),
np.mean(scores['te'][metric])), end=' ')
if metric == 'average_precision':
print(' Mean PR AUC Test: {:.4f}'.format(
np.mean(scores['te']['pr_auc'])), end=' ')
print(' Mean Features: {:.0f}'.format(np.mean(num_features)))
feature_annots = None
feature_weights = None
feature_scores = {}
for split_idx, split_result in enumerate(split_results):
split_feature_meta = split_result['feature_meta']
if feature_meta.columns.any():
if feature_annots is None:
feature_annots = split_feature_meta[feature_meta.columns]
else:
feature_annots = pd.concat(
[feature_annots,
split_feature_meta[feature_meta.columns]], axis=0)
elif feature_annots is None:
feature_annots = pd.DataFrame(index=split_feature_meta.index)
else:
feature_annots = pd.concat(
[feature_annots,
pd.DataFrame(index=split_feature_meta.index)], axis=0)
if 'Weight' in split_feature_meta.columns:
if feature_weights is None:
feature_weights = split_feature_meta[['Weight']].copy()
else:
feature_weights = feature_weights.join(
split_feature_meta[['Weight']], how='outer')
feature_weights.rename(
columns={'Weight': 'Weight {:d}'.format(split_idx + 1)},
inplace=True)
for metric in args.scv_scoring:
if metric not in feature_scores:
feature_scores[metric] = pd.DataFrame(
split_result['scores']['te'][metric], columns=[metric],
index=split_feature_meta.index)
else:
feature_scores[metric] = feature_scores[metric].join(
pd.DataFrame(split_result['scores']['te'][metric],
columns=[metric],
index=split_feature_meta.index),
how='outer')
feature_scores[metric].rename(columns={metric: split_idx},
inplace=True)
feature_annots = feature_annots.loc[
~feature_annots.index.duplicated(keep='first')]
feature_frequency = None
feature_results = None
feature_results_floatfmt = ['']
if feature_weights is not None:
feature_ranks = feature_weights.abs().rank(
ascending=False, method='min', na_option='keep')
feature_ranks.fillna(feature_ranks.count(axis=0) + 1, inplace=True)
feature_frequency = feature_weights.count(axis=1)
feature_weights.fillna(0, inplace=True)
feature_results = feature_annots.reindex(index=feature_ranks.index,
fill_value='')
for feature_annot_col in feature_annots.columns:
if is_integer_dtype(feature_annots[feature_annot_col]):
feature_results_floatfmt.append('.0f')
elif is_float_dtype(feature_annots[feature_annot_col]):
feature_results_floatfmt.append('.{:d}f'.format(
max(abs(Decimal(f).as_tuple().exponent)
for f in (feature_annots[feature_annot_col]
.astype(str)))))
else:
feature_results_floatfmt.append('')
feature_results['Frequency'] = feature_frequency
feature_results['Mean Weight Rank'] = feature_ranks.mean(axis=1)
feature_results['Mean Weight'] = feature_weights.mean(axis=1)
feature_results_floatfmt.extend(['.0f', '.1f', '.6e'])
for metric in args.scv_scoring:
if feature_results is None:
feature_results = feature_annots.reindex(
index=feature_scores[metric].index, fill_value='')
for feature_annot_col in feature_annots.columns:
if is_integer_dtype(feature_annots[feature_annot_col]):
feature_results_floatfmt.append('.0f')
elif is_float_dtype(feature_annots[feature_annot_col]):
feature_results_floatfmt.append('.{:d}f'.format(
max(abs(Decimal(f).as_tuple().exponent)
for f in (feature_annots[feature_annot_col]
.astype(str)))))
else:
feature_results_floatfmt.append('')
feature_frequency = feature_scores[metric].count(axis=1)
feature_results['Frequency'] = feature_frequency
feature_results_floatfmt.append('.0f')
feature_scores[metric].fillna(0.5, inplace=True)
if feature_scores[metric].mean(axis=1).nunique() > 1:
feature_results = feature_results.join(
pd.DataFrame({
'Mean Test {}'.format(metric_label[metric]):
feature_scores[metric].mean(axis=1)}),
how='left')
feature_results_floatfmt.append('.4f')
dump(feature_results, '{}/{}_feature_results.pkl'
.format(results_dir, model_name))
r_base.saveRDS(feature_results, '{}/{}_feature_results.rds'
.format(results_dir, model_name))
if feature_weights is not None:
dump(feature_weights, '{}/{}_feature_weights.pkl'
.format(results_dir, model_name))
r_base.saveRDS(feature_weights, '{}/{}_feature_weights.rds'
.format(results_dir, model_name))
if args.verbose > 0:
print('Overall Feature Ranking:')
if feature_weights is not None:
print(tabulate(
feature_results.sort_values(by='Mean Weight Rank'),
floatfmt=feature_results_floatfmt, headers='keys'))
else:
print(tabulate(
feature_results.sort_values(by='Mean Test {}'.format(
metric_label[args.scv_refit]), ascending=False),
floatfmt=feature_results_floatfmt, headers='keys'))
plot_param_cv_metrics(model_name, param_grid, param_cv_scores)
# plot roc and pr curves
sns.set_palette(sns.color_palette('hls', 2))
plt.figure(figsize=(args.fig_width, args.fig_height))
plt.suptitle('ROC Curve', fontsize=args.title_font_size)
plt.title('{}'.format(model_name), fontsize=args.title_font_size - 1)
plt.xlabel('False Positive Rate', fontsize=args.axis_font_size)
plt.ylabel('True Positive Rate', fontsize=args.axis_font_size)
plt.xlim([-0.01, 1.01])
plt.ylim([-0.01, 1.01])
tprs = []
mean_fpr = np.linspace(0, 1, 100)
for split_result in split_results:
tprs.append(np.interp(mean_fpr,
split_result['scores']['te']['fpr'],
split_result['scores']['te']['tpr']))
tprs[-1][0] = 0.0
plt.plot(split_result['scores']['te']['fpr'],
split_result['scores']['te']['tpr'], alpha=0.2,
color='darkgrey', lw=1)
mean_tpr = np.mean(tprs, axis=0)
mean_tpr[-1] = 1.0
mean_roc_auc = np.mean(scores['te']['roc_auc'])
std_roc_auc = np.std(scores['te']['roc_auc'])
mean_num_features = np.mean(num_features)
std_num_features = np.std(num_features)
plt.plot(mean_fpr, mean_tpr, lw=2, alpha=0.8, label=(
r'Test Mean ROC (AUC = {:.4f} $\pm$ {:.2f}, '
r'Features = {:.0f} $\pm$ {:.0f})').format(
mean_roc_auc, std_roc_auc, mean_num_features,
std_num_features))
std_tpr = np.std(tprs, axis=0)
tprs_upper = np.minimum(mean_tpr + std_tpr, 1)
tprs_lower = np.maximum(mean_tpr - std_tpr, 0)
plt.fill_between(mean_fpr, tprs_lower, tprs_upper, alpha=0.1,
color='grey', label=r'$\pm$ 1 std. dev.')
plt.plot([0, 1], [0, 1], alpha=0.2, color='grey',
linestyle='--', lw=2, label='Chance')
plt.legend(loc='lower right', fontsize='medium')
plt.tick_params(labelsize=args.axis_font_size)
plt.grid(False)
sns.set_palette(sns.color_palette('hls', 10))
plt.figure(figsize=(args.fig_width, args.fig_height))
plt.suptitle('PR Curve', fontsize=args.title_font_size)
plt.title('{}'.format(model_name), fontsize=args.title_font_size - 1)
plt.xlabel('Recall', fontsize=args.axis_font_size)
plt.ylabel('Precision', fontsize=args.axis_font_size)
plt.xlim([-0.01, 1.01])
plt.ylim([-0.01, 1.01])
pres = []
mean_rec = np.linspace(0, 1, 100)
for split_result in split_results:
pres.append(np.interp(
mean_rec, split_result['scores']['te']['rec'][::-1],
split_result['scores']['te']['pre'][::-1]))
plt.step(split_result['scores']['te']['rec'],
split_result['scores']['te']['pre'], alpha=0.2,
color='darkgrey', lw=1, where='post')
mean_pre = np.mean(pres, axis=0)
mean_pr_auc = np.mean(scores['te']['pr_auc'])
std_pr_auc = np.std(scores['te']['pr_auc'])
mean_num_features = np.mean(num_features)
std_num_features = np.std(num_features)
plt.step(mean_rec, mean_pre, alpha=0.8, lw=2, where='post',
label=(r'Test Mean PR (AUC = {:.4f} $\pm$ {:.2f}, '
r'Features = {:.0f} $\pm$ {:.0f})').format(
mean_pr_auc, std_pr_auc, mean_num_features,
std_num_features))
std_pre = np.std(pres, axis=0)
pres_upper = np.minimum(mean_pre + std_pre, 1)
pres_lower = np.maximum(mean_pre - std_pre, 0)
plt.fill_between(mean_rec, pres_lower, pres_upper, alpha=0.1,
color='grey', label=r'$\pm$ 1 std. dev.')
plt.legend(loc='lower right', fontsize='medium')
plt.tick_params(labelsize=args.axis_font_size)
plt.grid(False)
for fig_num in plt.get_fignums():
plt.figure(fig_num, constrained_layout=True)
for fig_fmt in args.fig_format:
plt.savefig('{}/Figure_{:d}.{}'.format(results_dir, fig_num, fig_fmt),
bbox_inches='tight', format=fig_fmt)
plt.show()
import os
import numpy as np
import rpy2.robjects as robjects
from rpy2.robjects import numpy2ri, pandas2ri
from rpy2.robjects.packages import importr
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_selection._base import SelectorMixin
from sklearn.utils import check_array, check_X_y
from sklearn.utils.validation import check_is_fitted
numpy2ri.deactivate()
pandas2ri.deactivate()
numpy2ri.activate()
pandas2ri.activate()
if 'edger_filterbyexpr_mask' not in robjects.globalenv:
r_base = importr('base')
r_base.source(os.path.dirname(__file__) + '/sklearn_edger.R')
r_edger_filterbyexpr_mask = robjects.globalenv['edger_filterbyexpr_mask']
r_edger_tmm_logcpm_fit = robjects.globalenv['edger_tmm_logcpm_fit']
r_edger_tmm_logcpm_transform = robjects.globalenv['edger_tmm_logcpm_transform']
class EdgeRFilterByExpr(SelectorMixin, BaseEstimator):
"""edgeR filterByExpr feature selector for count data
Parameters
----------
is_classif : bool (default = True)
Whether this is a classification design.
"""
def __init__(self, is_classif=True):
self.is_classif = is_classif
def fit(self, X, y):
"""
Parameters
----------
X : array-like, shape = (n_samples, n_features)
Training counts data matrix.
y : array-like, shape = (n_samples,)
Training sample class labels.
Returns
-------
self : object
Returns self.
"""
X, y = check_X_y(X, y, dtype=int)
self._mask = np.array(r_edger_filterbyexpr_mask(
X, y, is_classif=self.is_classif), dtype=bool)
return self
def transform(self, X):
"""
Parameters
----------
X : array-like, shape = (n_samples, n_features)
Input counts data matrix.
Returns
-------
Xr : array of shape (n_samples, n_selected_features)
edgeR filterByExpr counts data matrix with only the selected
features.
"""
check_is_fitted(self, '_mask')
X = check_array(X, dtype=int)
return super().transform(X)
def inverse_transform(self, X):
"""
Parameters
----------
X : array-like, shape = (n_samples, n_features)
Input transformed data matrix.
Returns
-------
Xr : array of shape (n_samples, n_original_features)
`X` with columns of zeros inserted where features would have
been removed by :meth:`transform`.
"""
raise NotImplementedError('inverse_transform not implemented.')
def _more_tags(self):
return {'requires_positive_X': True}
def _get_support_mask(self):
check_is_fitted(self, '_mask')
return self._mask
class EdgeRTMMLogCPM(TransformerMixin, BaseEstimator):
"""edgeR TMM normalization and log-CPM transformation for count data
Parameters
----------
prior_count : int (default = 1)
Average count to add to each observation to avoid taking log of zero.
Larger values for produce stronger moderation of the values for low
counts and more shrinkage of the corresponding log fold changes.
Attributes
----------
ref_sample_ : array, shape (n_features,)
TMM normalization reference sample feature vector.
"""
def __init__(self, prior_count=1):
self.prior_count = prior_count
def fit(self, X, y=None):
"""
Parameters
----------
X : array-like, shape = (n_samples, n_features)
Input counts data matrix.
y : ignored
"""
X = check_array(X, dtype=int)
self.ref_sample_ = np.array(r_edger_tmm_logcpm_fit(X), dtype=float)
return self
def transform(self, X):
"""
Parameters
----------
X : array-like, shape = (n_samples, n_features)
Input counts data matrix.
Returns
-------
Xt : array of shape (n_samples, n_features)
edgeR TMM normalized log-CPM transformed data matrix.
"""
check_is_fitted(self, 'ref_sample_')
X = check_array(X, dtype=int)
X = np.array(r_edger_tmm_logcpm_transform(
X, ref_sample=self.ref_sample_, prior_count=self.prior_count),
dtype=float)
return X
def inverse_transform(self, X):
"""
Parameters
----------
X : array-like, shape = (n_samples, n_features)
Input transformed data matrix.
Returns
-------
Xr : array of shape (n_samples, n_original_features)
"""
raise NotImplementedError('inverse_transform not implemented.')
def _more_tags(self):
return {'requires_positive_X': True}
suppressPackageStartupMessages(library(edgeR))
edger_filterbyexpr_mask <- function(X, y, is_classif=TRUE) {
dge <- DGEList(counts=t(X))
if (is_classif) {
design <- model.matrix(~factor(y))
} else {
design <- NULL
}
return(filterByExpr(dge, design))
}
# adapted from edgeR::calcNormFactors source code
edger_tmm_ref_column <- function(counts, lib.size=colSums(counts), p=0.75) {
y <- t(t(counts) / lib.size)
f <- apply(y, 2, function(x) quantile(x, p=p))
ref_column <- which.min(abs(f - mean(f)))
}
edger_tmm_logcpm_fit <- function(X) {
counts <- t(X)
ref_sample <- counts[, edger_tmm_ref_column(counts)]
return(ref_sample)
}
edger_tmm_logcpm_transform <- function(X, ref_sample, prior_count=1) {
counts <- t(X)
if (any(apply(counts, 2, function(c) all(c == ref_sample)))) {
dge <- DGEList(counts=counts)
dge <- calcNormFactors(dge, method="TMM")
log_cpm <- cpm(dge, log=TRUE, prior.count=prior_count)
} else {
counts <- cbind(counts, ref_sample)
colnames(counts) <- NULL
dge <- DGEList(counts=counts)
dge <- calcNormFactors(dge, method="TMM", refColumn=ncol(dge))
log_cpm <- cpm(dge, log=TRUE, prior.count=prior_count)
log_cpm <- log_cpm[, -ncol(log_cpm)]
}
return(t(log_cpm))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment