Skip to content

Instantly share code, notes, and snippets.

@betatim
Created March 24, 2026 14:51
Show Gist options
  • Select an option

  • Save betatim/f616d838a4b5bad6fc179a7369a557a5 to your computer and use it in GitHub Desktop.

Select an option

Save betatim/f616d838a4b5bad6fc179a7369a557a5 to your computer and use it in GitHub Desktop.
A generated workflow that does "single cell" analysis. The main point of this is to provide something a model can accelerate
import tarfile
import time
import urllib.request
from collections import OrderedDict
from pathlib import Path
import numpy as np
import scipy.io
import scipy.sparse
from sklearn.decomposition import TruncatedSVD
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler
import hdbscan
import umap
# ---------------------------------------------------------------------------
# Timing helpers
# ---------------------------------------------------------------------------
_timings = OrderedDict()
class timed:
def __init__(self, name):
self.name = name
def __enter__(self):
self.start = time.perf_counter()
return self
def __exit__(self, *exc):
_timings[self.name] = time.perf_counter() - self.start
def print_header(n_cells, n_genes):
mode = "Results"
print("=" * 70)
print(" Single-Cell RNA-seq Analysis Pipeline (sparse matrix path)")
print(f" Mode: {mode}")
print(f" Cells: {n_cells:,}")
print(f" Genes: {n_genes:,}")
print("=" * 70)
def print_timings():
total = sum(_timings.values())
print()
print("=" * 70)
print(" TIMING SUMMARY")
print("-" * 70)
for name, t in _timings.items():
print(f" {name:40s} {t:>8.2f}s")
print("-" * 70)
print(f" {'TOTAL':40s} {total:>8.2f}s")
print("=" * 70)
# ---------------------------------------------------------------------------
# Data loading
# ---------------------------------------------------------------------------
_DATA_URL = (
"https://cf.10xgenomics.com/samples/cell-exp/1.1.0/"
"fresh_68k_pbmc_donor_a/"
"fresh_68k_pbmc_donor_a_filtered_gene_bc_matrices.tar.gz"
)
_CACHE_DIR = Path.home() / ".cache" / "cuml_examples" / "pbmc68k"
def _download_pbmc68k():
"""Download and extract the PBMC 68K matrix market files."""
mtx_dir = _CACHE_DIR / "filtered_matrices_mex" / "hg19"
if (mtx_dir / "matrix.mtx").exists():
return mtx_dir
_CACHE_DIR.mkdir(parents=True, exist_ok=True)
archive_path = _CACHE_DIR / "pbmc68k.tar.gz"
if not archive_path.exists():
print(f" Downloading PBMC 68K dataset (~120 MB) ...")
req = urllib.request.Request(_DATA_URL, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req) as resp, open(archive_path, "wb") as f:
while True:
chunk = resp.read(1 << 20)
if not chunk:
break
f.write(chunk)
print(f" Extracting ...")
with tarfile.open(archive_path) as tar:
tar.extractall(_CACHE_DIR, filter="data")
return mtx_dir
def load_pbmc68k():
"""Load the PBMC 68K count matrix (cells x genes, sparse)."""
mtx_dir = _download_pbmc68k()
mat = scipy.io.mmread(mtx_dir / "matrix.mtx").T # genes x cells -> cells x genes
mat = scipy.sparse.csr_matrix(mat)
return mat
# ---------------------------------------------------------------------------
# Preprocessing (CPU, standard bioinformatics steps)
# ---------------------------------------------------------------------------
def _column_var_sparse(mat):
"""Population variance per column; zeros are implicit in the sparse structure."""
mat = mat.tocsr()
n = mat.shape[0]
col_sum = np.asarray(mat.sum(axis=0)).ravel()
col_sum_sq = np.asarray(mat.power(2).sum(axis=0)).ravel()
mean = col_sum / n
return col_sum_sq / n - mean**2
def preprocess(mat, min_cells=10, n_top_genes=2000):
"""Filter genes, normalize, log-transform, select highly variable genes.
Returns ``scipy.sparse.csr_matrix`` float32 of shape (n_cells, n_top_genes).
"""
gene_counts = np.diff(mat.tocsc().indptr)
keep = gene_counts >= min_cells
mat = mat[:, keep].tocsr()
n_genes_after = mat.shape[1]
print(f" Genes after filtering (>={min_cells} cells): {n_genes_after:,}")
cell_sums = np.asarray(mat.sum(axis=1)).ravel()
median_sum = np.median(cell_sums)
scaling = median_sum / cell_sums
mat = scipy.sparse.csr_matrix(mat.multiply(scaling[:, np.newaxis]))
mat = mat.log1p() if hasattr(mat, "log1p") else scipy.sparse.csr_matrix(np.log1p(mat.toarray()))
gene_var = _column_var_sparse(mat)
top_idx = np.argsort(gene_var)[-n_top_genes:]
X = mat[:, top_idx].tocsr().astype(np.float32)
nnz = X.nnz
dens = 100.0 * nnz / max(X.shape[0] * X.shape[1], 1)
print(f" Selected top {n_top_genes} highly variable genes")
print(f" Final matrix: {X.shape[0]:,} cells x {X.shape[1]} genes (csr, nnz={nnz:,}, {dens:.2f}% dense)")
return X
# ---------------------------------------------------------------------------
# Main workflow
# ---------------------------------------------------------------------------
def main():
# 1. Load data
print("\n[1/7] Loading PBMC 68K dataset ...")
with timed("Load data"):
mat = load_pbmc68k()
n_cells, n_genes = mat.shape
print(f" Raw matrix: {n_cells:,} cells x {n_genes:,} genes")
print_header(n_cells, n_genes)
# 2. Preprocess
print("\n[2/7] Preprocessing (filter, normalize, log1p, HVG selection) ...")
with timed("Preprocessing (CPU)"):
X = preprocess(mat)
# 3. Scale (with_mean=False keeps output sparse; centering would densify)
print("\n[3/7] StandardScaler (with_mean=False, sparse-safe) ...")
with timed("StandardScaler"):
scaler = StandardScaler(with_mean=False)
X_scaled = scaler.fit_transform(X)
if scipy.sparse.issparse(X_scaled):
print(f" Scaled matrix: csr, nnz={X_scaled.nnz:,}")
else:
print(f" Scaled matrix: dense {X_scaled.shape}")
# 4. Dimensionality reduction (TruncatedSVD accepts sparse csr/csc)
n_pcs = 50
print(f"\n[4/7] TruncatedSVD (n_components={n_pcs}) ...")
with timed("TruncatedSVD"):
svd = TruncatedSVD(n_components=n_pcs, random_state=42)
X_pca = svd.fit_transform(X_scaled)
var_explained = svd.explained_variance_ratio_.sum()
print(f" Explained variance ratio (sum over {n_pcs} components): {var_explained:.1%}")
# 5. KNN graph
k = 15
print(f"\n[5/7] NearestNeighbors (k={k}) ...")
with timed("NearestNeighbors"):
nn = NearestNeighbors(n_neighbors=k).fit(X_pca)
distances, indices = nn.kneighbors(X_pca)
print(f" KNN graph built: {X_pca.shape[0]:,} cells, k={k}")
# 6. UMAP
print("\n[6/7] UMAP (n_components=2) ...")
with timed("UMAP"):
X_umap = umap.UMAP(
n_components=2,
n_neighbors=k,
random_state=42,
).fit_transform(X_pca)
# 7. HDBSCAN
print("\n[7/7] HDBSCAN clustering ...")
with timed("HDBSCAN"):
labels = hdbscan.HDBSCAN(min_cluster_size=50).fit_predict(X_umap)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = int((labels == -1).sum())
print(f" Clusters found: {n_clusters}")
print(f" Noise cells: {n_noise:,} ({100 * n_noise / len(labels):.1f}%)")
# Cluster size summary
print("\n" + "-" * 70)
print(" CLUSTER SIZES")
print("-" * 70)
unique, counts = np.unique(labels, return_counts=True)
for lbl, cnt in sorted(zip(unique, counts), key=lambda x: -x[1]):
name = "Noise" if lbl == -1 else f"Cluster {lbl}"
print(f" {name:<20s} {cnt:>7,} cells ({100 * cnt / len(labels):5.1f}%)")
# Separate preprocessing vs ML timings
preproc_keys = {"Load data", "Preprocessing (CPU)"}
ml_time = sum(v for k, v in _timings.items() if k not in preproc_keys)
preproc_time = sum(v for k, v in _timings.items() if k in preproc_keys)
print(f"\n Preprocessing time: {preproc_time:.2f}s")
print(f" ML pipeline time: {ml_time:.2f}s")
print_timings()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment