Last active
April 17, 2019 22:18
-
-
Save mortonjt/0bb8d0565fc6c02b48e91524e816f112 to your computer and use it in GitHub Desktop.
Closed form categorical Poisson regression
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import patsy | |
import pandas as pd | |
from biom import Table | |
# This is the main function | |
def poisson_cat(table, metadata, category, ref=None): | |
""" Poisson differential abundance. | |
Parameters | |
---------- | |
table : biom.Table | |
Sparse matrix of counts. | |
metadata : pd.DataFrame | |
Sample metadata file. | |
category : str | |
Metadata category of interest. | |
ref : str | |
Reference category. If not specified, the first | |
category will be picked. | |
Returns | |
------- | |
differential : pd.DataFrame | |
Differentials of all of the variables of interest. | |
Notes | |
----- | |
This only works on a single categorical variable. If you have continuous | |
variables, stop it. | |
This also assumes that the metadata and table have already been matched and | |
aligned. | |
TODO | |
---- | |
Extend this to multiple metadata categories, so that multiple columns | |
can be analyzed simultaneously | |
""" | |
if ref is None: | |
ref = metadata[category].unique()[0] | |
idx = metadata[category] == ref | |
x = idx.astype(np.int64).reshape(-1, 1) | |
y = table.matrix_data | |
A = y @ ~x # reference microbe counts | |
B = table.sum(axis='observation') | |
N = table.sum(axis='sample') | |
C = N[idx].sum() | |
D = N[~idx].sum() | |
diff = pd.Series(np.log((A * B * C) / (B * B * D - A)), | |
table.ids(axis='observation')) | |
return diff | |
# This is for testing | |
def random_block_table(reps, n_species, | |
species_mean=0, | |
species_var=1., | |
effect_size=1, | |
library_size=10000, | |
microbe_total=100000, microbe_kappa=0.3, | |
microbe_tau=0.1, sigma=0.5, seed=None): | |
""" Differential abundance analysis benchmarks. | |
The simulation here consists of 3 parts | |
Step 1: generate class probabilities using logistic distribution | |
Step 2: generate coefficients from normal distributions | |
Step 3: generate counts from species distributions | |
Parameters | |
---------- | |
reps : int | |
Number of replicate samples per test. | |
n_species : int | |
Number of species. | |
species_loc : float | |
Mean of the species prior. | |
species_variance : float | |
Variance of species log-fold differences | |
effect_size : int | |
The effect size difference between the feature abundances. | |
n_contaminants : int | |
Number of contaminant species. | |
sigma: float | |
Logistic error variance for class probabilities | |
library_size : np.array | |
A vector specifying the library sizes per sample. | |
template : np.array | |
A vector specifying feature abundances or relative proportions. | |
Returns | |
------- | |
generator of | |
pd.DataFrame | |
Ground truth tables. | |
pd.DataFrame | |
Metadata group categories, n_diff and effect_size | |
pd.Series | |
Species actually differentially abundant. | |
""" | |
state = check_random_state(seed) | |
data = [] | |
n = reps * 2 | |
k = 2 | |
labels = np.array([-effect_size] * (n // 2) + [effect_size] * (n // 2)) | |
eps = np.random.logistic(loc=0, scale=sigma, size=n) | |
class_probs = labels + eps | |
X = np.hstack((np.ones((n, 1)), class_probs.reshape(-1, 1))) | |
B = np.random.normal(loc=species_mean, scale=species_var, size=(k, n_species)) | |
## Helper functions | |
# Convert microbial abundances to counts | |
def to_counts_f(x): | |
n = state.lognormal(np.log(library_size), microbe_tau) | |
p = x / x.sum() | |
return state.poisson(state.lognormal(np.log(n*p), microbe_kappa)) | |
o_ids = ['F%d' % i for i in range(n_species)] | |
s_ids = ['S%d' % i for i in range(n)] | |
abs_table = pd.DataFrame(np.exp(X @ B) * microbe_total, | |
index=s_ids, | |
columns=o_ids) | |
rel_data = np.vstack(abs_table.apply(to_counts_f, axis=1)) | |
rel_table = pd.DataFrame(rel_data, | |
index=s_ids, | |
columns=o_ids) | |
metadata = pd.DataFrame({'labels': labels}) | |
metadata['effect_size'] = effect_size | |
metadata['microbe_total'] = microbe_total | |
metadata['class_logits'] = class_probs | |
metadata['intercept'] = 1 | |
metadata.index = s_ids | |
ground_truth = pd.DataFrame({ | |
'intercept': B[0, :], | |
'categorical': B[1, :] | |
}, index=o_ids) | |
return abs_table, rel_table, metadata, ground_truth | |
if __name__ == "__main__": | |
import unittest | |
class TestPoissonCat(unittest.TestCase): | |
def setUp(self): | |
num_samples = 100 | |
reps = 50 | |
n_species = 200 | |
np.random.seed(2) | |
self.res = random_block_table( | |
reps, n_species, | |
species_mean=0, | |
species_var=1, | |
microbe_kappa=0.7, | |
microbe_tau=0.7, | |
library_size=10000, | |
microbe_total=100000, | |
effect_size=1) | |
def test_poisson_cat(self): | |
abs_table, rel_table, metadata, ground_truth = self.res | |
table = Table(rel_table.T, rel_table.columns, rel_table.index) | |
res_diff = poisson_cat(table, metadata, category='label') | |
exp_diff = ground_truth['categorical'] | |
# This may not work -- prepare to debug | |
from scipy.stats import pearsonr | |
r, p = pearsonr(res_diff, exp_diff) | |
self.assertGreater(r, 0.5) | |
self.assertLess(p, 0.01) | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment