Created
May 10, 2020 13:01
-
-
Save pavlin-policar/a090d87f5dba1a98abd3d152794613e7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import OrderedDict | |
from typing import Iterable | |
import numpy as np | |
import scipy.sparse as sp | |
import scipy.stats as stats | |
import pandas as pd | |
def FDR(p_values: Iterable, dependent=False, m=None, ordered=False) -> Iterable: | |
"""False Discovery Rate correction for multiple comparisons. | |
Parameters | |
---------- | |
p_values: Union[list, np.ndarray] | |
p-values. | |
dependent: bool | |
use correction for dependent hypotheses (default False). | |
m: int | |
number of hypotheses tested (default ``len(p_values)``). | |
ordered: bool | |
prevent sorting of p-values if they are already sorted (default False). | |
Returns | |
------- | |
Union[list, np.ndarray] | |
Same as the input | |
""" | |
if p_values is None or len(p_values) == 0 or (m is not None and m <= 0): | |
return None | |
is_list = isinstance(p_values, list) | |
p_values = np.array(p_values) | |
if m is None: | |
m = len(p_values) | |
if not ordered: | |
ordered = (np.diff(p_values) >= 0).all() | |
if not ordered: | |
indices = np.argsort(p_values) | |
p_values = p_values[indices] | |
if dependent: # correct q for dependent tests | |
m *= sum(1 / np.arange(1, m + 1)) | |
fdrs = (p_values * m / np.arange(1, len(p_values) + 1))[::-1] | |
fdrs = np.array(np.minimum.accumulate(fdrs)[::-1]) | |
if not ordered: | |
fdrs[indices] = fdrs.copy() | |
return fdrs if not is_list else list(fdrs) | |
def mannwhitneyu(*args, **kwargs): | |
try: | |
return stats.mannwhitneyu(*args, **kwargs).pvalue | |
except ValueError: | |
return 1 | |
def differential_expression( | |
adata, group_by="ClusterID", layer=None, alternative="greater", test_type="wilcox", | |
lfc_thresh=0.25, fdr_thresh=0.05, min_pct=0.1, n_jobs=1, | |
): | |
if test_type not in ("wilcox",): | |
raise ValueError() | |
if alternative not in ("two-sided", "less", "greater"): | |
raise ValueError() | |
if layer is None: | |
X = adata.X | |
else: | |
X = adata.layers[layer] | |
results = [] | |
n_groups = len(adata.obs[group_by].unique()) | |
n_genes = len(adata.var_names) | |
for group_idx, group_id in enumerate(sorted(adata.obs[group_by].unique())): | |
mask = adata.obs[group_by].values == group_id | |
print( | |
f"Testing {n_genes} genes for group {group_id}, " | |
f"{mask.sum()} cells ({group_idx + 1} / {n_groups}) {test_type}" | |
) | |
# Filter out genes that are expressed in too few cells | |
pct_expressed_group = np.ravel(np.mean(X[ mask] > 0, axis=0)) | |
pct_expressed_other = np.ravel(np.mean(X[~mask] > 0, axis=0)) | |
mask_pct_expressed = pct_expressed_group >= min_pct | |
# Filter out genes with too low log-fold change | |
means_group = np.ravel(np.mean(X[mask], axis=0)) | |
means_other = np.ravel(np.mean(X[~mask], axis=0)) | |
log_fold_change = np.log((means_group + 1e-5) / (means_other + 1e-5)) / np.log(2) | |
mask_lfc = np.abs(log_fold_change) >= lfc_thresh | |
def compare_groups(gene_idx): | |
x0 = X[ mask, gene_idx] | |
x1 = X[~mask, gene_idx] | |
if sp.issparse(x0): | |
x0 = x0.toarray() | |
if sp.issparse(x1): | |
x1 = x1.toarray() | |
x0 = np.ravel(x0) | |
x1 = np.ravel(x1) | |
if test_type == "wilcox": | |
pval = mannwhitneyu(x0, x1, alternative=alternative) | |
return OrderedDict([ | |
("gene_id", adata.var_names[gene_idx]), | |
("group_id", group_id), | |
("n_group", x0.shape[0]), | |
("n_other", x1.shape[0]), | |
("n_expressed_group", np.sum(x0 > 0)), | |
("n_expressed_other", np.sum(x1 > 0)), | |
("avg_expression_group", means_group[gene_idx]), | |
("avg_expression_other", means_other[gene_idx]), | |
("pct_expressed_group", pct_expressed_group[gene_idx]), | |
("pct_expressed_other", pct_expressed_other[gene_idx]), | |
("log_fold_change", log_fold_change[gene_idx]), | |
("test_type", test_type), | |
("pvalue", pval), | |
]) | |
final_gene_mask = mask_pct_expressed & mask_lfc | |
with ThreadPoolExecutor(max_workers=n_jobs) as executor: | |
results.extend(executor.map(compare_groups, np.argwhere(final_gene_mask).ravel())) | |
df = pd.DataFrame(results) | |
df["fdr"] = FDR(df["pvalue"].values, m=n_genes) | |
df = df.loc[df["fdr"] <= fdr_thresh] | |
return df | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment