Created
March 24, 2026 14:51
-
-
Save betatim/f616d838a4b5bad6fc179a7369a557a5 to your computer and use it in GitHub Desktop.
A generated workflow that does "single cell" analysis. The main point of this is to provide something a model can accelerate
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import tarfile | |
| import time | |
| import urllib.request | |
| from collections import OrderedDict | |
| from pathlib import Path | |
| import numpy as np | |
| import scipy.io | |
| import scipy.sparse | |
| from sklearn.decomposition import TruncatedSVD | |
| from sklearn.neighbors import NearestNeighbors | |
| from sklearn.preprocessing import StandardScaler | |
| import hdbscan | |
| import umap | |
| # --------------------------------------------------------------------------- | |
| # Timing helpers | |
| # --------------------------------------------------------------------------- | |
| _timings = OrderedDict() | |
| class timed: | |
| def __init__(self, name): | |
| self.name = name | |
| def __enter__(self): | |
| self.start = time.perf_counter() | |
| return self | |
| def __exit__(self, *exc): | |
| _timings[self.name] = time.perf_counter() - self.start | |
| def print_header(n_cells, n_genes): | |
| mode = "Results" | |
| print("=" * 70) | |
| print(" Single-Cell RNA-seq Analysis Pipeline (sparse matrix path)") | |
| print(f" Mode: {mode}") | |
| print(f" Cells: {n_cells:,}") | |
| print(f" Genes: {n_genes:,}") | |
| print("=" * 70) | |
| def print_timings(): | |
| total = sum(_timings.values()) | |
| print() | |
| print("=" * 70) | |
| print(" TIMING SUMMARY") | |
| print("-" * 70) | |
| for name, t in _timings.items(): | |
| print(f" {name:40s} {t:>8.2f}s") | |
| print("-" * 70) | |
| print(f" {'TOTAL':40s} {total:>8.2f}s") | |
| print("=" * 70) | |
| # --------------------------------------------------------------------------- | |
| # Data loading | |
| # --------------------------------------------------------------------------- | |
| _DATA_URL = ( | |
| "https://cf.10xgenomics.com/samples/cell-exp/1.1.0/" | |
| "fresh_68k_pbmc_donor_a/" | |
| "fresh_68k_pbmc_donor_a_filtered_gene_bc_matrices.tar.gz" | |
| ) | |
| _CACHE_DIR = Path.home() / ".cache" / "cuml_examples" / "pbmc68k" | |
| def _download_pbmc68k(): | |
| """Download and extract the PBMC 68K matrix market files.""" | |
| mtx_dir = _CACHE_DIR / "filtered_matrices_mex" / "hg19" | |
| if (mtx_dir / "matrix.mtx").exists(): | |
| return mtx_dir | |
| _CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| archive_path = _CACHE_DIR / "pbmc68k.tar.gz" | |
| if not archive_path.exists(): | |
| print(f" Downloading PBMC 68K dataset (~120 MB) ...") | |
| req = urllib.request.Request(_DATA_URL, headers={"User-Agent": "Mozilla/5.0"}) | |
| with urllib.request.urlopen(req) as resp, open(archive_path, "wb") as f: | |
| while True: | |
| chunk = resp.read(1 << 20) | |
| if not chunk: | |
| break | |
| f.write(chunk) | |
| print(f" Extracting ...") | |
| with tarfile.open(archive_path) as tar: | |
| tar.extractall(_CACHE_DIR, filter="data") | |
| return mtx_dir | |
| def load_pbmc68k(): | |
| """Load the PBMC 68K count matrix (cells x genes, sparse).""" | |
| mtx_dir = _download_pbmc68k() | |
| mat = scipy.io.mmread(mtx_dir / "matrix.mtx").T # genes x cells -> cells x genes | |
| mat = scipy.sparse.csr_matrix(mat) | |
| return mat | |
| # --------------------------------------------------------------------------- | |
| # Preprocessing (CPU, standard bioinformatics steps) | |
| # --------------------------------------------------------------------------- | |
| def _column_var_sparse(mat): | |
| """Population variance per column; zeros are implicit in the sparse structure.""" | |
| mat = mat.tocsr() | |
| n = mat.shape[0] | |
| col_sum = np.asarray(mat.sum(axis=0)).ravel() | |
| col_sum_sq = np.asarray(mat.power(2).sum(axis=0)).ravel() | |
| mean = col_sum / n | |
| return col_sum_sq / n - mean**2 | |
| def preprocess(mat, min_cells=10, n_top_genes=2000): | |
| """Filter genes, normalize, log-transform, select highly variable genes. | |
| Returns ``scipy.sparse.csr_matrix`` float32 of shape (n_cells, n_top_genes). | |
| """ | |
| gene_counts = np.diff(mat.tocsc().indptr) | |
| keep = gene_counts >= min_cells | |
| mat = mat[:, keep].tocsr() | |
| n_genes_after = mat.shape[1] | |
| print(f" Genes after filtering (>={min_cells} cells): {n_genes_after:,}") | |
| cell_sums = np.asarray(mat.sum(axis=1)).ravel() | |
| median_sum = np.median(cell_sums) | |
| scaling = median_sum / cell_sums | |
| mat = scipy.sparse.csr_matrix(mat.multiply(scaling[:, np.newaxis])) | |
| mat = mat.log1p() if hasattr(mat, "log1p") else scipy.sparse.csr_matrix(np.log1p(mat.toarray())) | |
| gene_var = _column_var_sparse(mat) | |
| top_idx = np.argsort(gene_var)[-n_top_genes:] | |
| X = mat[:, top_idx].tocsr().astype(np.float32) | |
| nnz = X.nnz | |
| dens = 100.0 * nnz / max(X.shape[0] * X.shape[1], 1) | |
| print(f" Selected top {n_top_genes} highly variable genes") | |
| print(f" Final matrix: {X.shape[0]:,} cells x {X.shape[1]} genes (csr, nnz={nnz:,}, {dens:.2f}% dense)") | |
| return X | |
| # --------------------------------------------------------------------------- | |
| # Main workflow | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| # 1. Load data | |
| print("\n[1/7] Loading PBMC 68K dataset ...") | |
| with timed("Load data"): | |
| mat = load_pbmc68k() | |
| n_cells, n_genes = mat.shape | |
| print(f" Raw matrix: {n_cells:,} cells x {n_genes:,} genes") | |
| print_header(n_cells, n_genes) | |
| # 2. Preprocess | |
| print("\n[2/7] Preprocessing (filter, normalize, log1p, HVG selection) ...") | |
| with timed("Preprocessing (CPU)"): | |
| X = preprocess(mat) | |
| # 3. Scale (with_mean=False keeps output sparse; centering would densify) | |
| print("\n[3/7] StandardScaler (with_mean=False, sparse-safe) ...") | |
| with timed("StandardScaler"): | |
| scaler = StandardScaler(with_mean=False) | |
| X_scaled = scaler.fit_transform(X) | |
| if scipy.sparse.issparse(X_scaled): | |
| print(f" Scaled matrix: csr, nnz={X_scaled.nnz:,}") | |
| else: | |
| print(f" Scaled matrix: dense {X_scaled.shape}") | |
| # 4. Dimensionality reduction (TruncatedSVD accepts sparse csr/csc) | |
| n_pcs = 50 | |
| print(f"\n[4/7] TruncatedSVD (n_components={n_pcs}) ...") | |
| with timed("TruncatedSVD"): | |
| svd = TruncatedSVD(n_components=n_pcs, random_state=42) | |
| X_pca = svd.fit_transform(X_scaled) | |
| var_explained = svd.explained_variance_ratio_.sum() | |
| print(f" Explained variance ratio (sum over {n_pcs} components): {var_explained:.1%}") | |
| # 5. KNN graph | |
| k = 15 | |
| print(f"\n[5/7] NearestNeighbors (k={k}) ...") | |
| with timed("NearestNeighbors"): | |
| nn = NearestNeighbors(n_neighbors=k).fit(X_pca) | |
| distances, indices = nn.kneighbors(X_pca) | |
| print(f" KNN graph built: {X_pca.shape[0]:,} cells, k={k}") | |
| # 6. UMAP | |
| print("\n[6/7] UMAP (n_components=2) ...") | |
| with timed("UMAP"): | |
| X_umap = umap.UMAP( | |
| n_components=2, | |
| n_neighbors=k, | |
| random_state=42, | |
| ).fit_transform(X_pca) | |
| # 7. HDBSCAN | |
| print("\n[7/7] HDBSCAN clustering ...") | |
| with timed("HDBSCAN"): | |
| labels = hdbscan.HDBSCAN(min_cluster_size=50).fit_predict(X_umap) | |
| n_clusters = len(set(labels)) - (1 if -1 in labels else 0) | |
| n_noise = int((labels == -1).sum()) | |
| print(f" Clusters found: {n_clusters}") | |
| print(f" Noise cells: {n_noise:,} ({100 * n_noise / len(labels):.1f}%)") | |
| # Cluster size summary | |
| print("\n" + "-" * 70) | |
| print(" CLUSTER SIZES") | |
| print("-" * 70) | |
| unique, counts = np.unique(labels, return_counts=True) | |
| for lbl, cnt in sorted(zip(unique, counts), key=lambda x: -x[1]): | |
| name = "Noise" if lbl == -1 else f"Cluster {lbl}" | |
| print(f" {name:<20s} {cnt:>7,} cells ({100 * cnt / len(labels):5.1f}%)") | |
| # Separate preprocessing vs ML timings | |
| preproc_keys = {"Load data", "Preprocessing (CPU)"} | |
| ml_time = sum(v for k, v in _timings.items() if k not in preproc_keys) | |
| preproc_time = sum(v for k, v in _timings.items() if k in preproc_keys) | |
| print(f"\n Preprocessing time: {preproc_time:.2f}s") | |
| print(f" ML pipeline time: {ml_time:.2f}s") | |
| print_timings() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment