Last active
July 21, 2025 21:42
-
-
Save shahpnmlab/1b972b0002e2527767303e1dedb024de to your computer and use it in GitHub Desktop.
pytom peak extratction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mrcfile | |
import numpy as np | |
import pandas as pd | |
import typer | |
import starfile | |
import re | |
from skimage.feature import peak_local_max | |
from imodmodel import ImodModel | |
from imodmodel.models import ContourType | |
from scipy.optimize import curve_fit | |
from pathlib import Path | |
from typing import List, Optional, Tuple | |
# Create a typer app for a clean CLI | |
app = typer.Typer( | |
name="peak-picker", | |
help="The script processes one or more 3D score maps (.mrc files) and performs a three-step procedure on each one:" | |
"Robust Normalization: The script first reads a CC map and normalizes it by calculating a robust Z-score for every voxel." | |
"This converts the raw correlation values into a statistical map where the values represent how many standard deviations a given score is from the background noise." | |
"Intelligent Thresholding (Default Method): Instead of relying on a user-guesstimated threshold, the script's primary feature is its 'intelligent' threshold determination." | |
"It analyzes the histogram of the normalized scores, assuming that the vast majority of the volume is noise that follows a Gaussian (bell curve) distribution." | |
"It mathematically fits a Gaussian function to this noise peak to precisely calculate its mean (µ) and standard deviation (σ)." | |
"A threshold is then automatically calculated at µ + (σ * multiplier), where the multiplier (defaulting to 5.0) sets the desired statistical significance. This ensures that only peaks that are highly unlikely to be random noise are selected." | |
"Peak Detection & Non-Maximum Suppression (NMS): Using the calculated threshold, the script identifies all significant peaks. Crucially, it applies a Non-Maximum Suppression filter based on a user-provided minimum distance (--min-distance-ang). This step is vital for ensuring that" | |
"only a single coordinate is picked for each particle, preventing multiple 'hits' on the same object." | |
"Overriding the Intelligent Method" | |
"For full user control, the intelligent thresholding can be bypassed. By providing a --fixed-threshold, the user can enforce a specific Z-score cutoff, which will be applied uniformly to all processed maps.", | |
add_completion=False | |
) | |
def gaussian(x, amplitude, mean, stddev): | |
"""A 1D Gaussian function for fitting the noise distribution.""" | |
return amplitude * np.exp(-((x - mean) / stddev)**2 / 2) | |
def write_consolidated_star_file( | |
output_dir: Path, | |
all_particle_data: List[Tuple[str, int, int, int, float]], | |
output_filename: str = "tomocpt_coords.star" | |
): | |
""" | |
Writes particle coordinates from all processed tomograms to a WARP-compatible star file. | |
Args: | |
output_dir: The directory to save the star file in. | |
all_particle_data: A list of tuples, where each tuple contains | |
(tomo_name, z, y, x, score). | |
output_filename: The name of the output star file. | |
""" | |
if not all_particle_data: | |
print("No particles were found across all maps. No STAR file will be generated.") | |
return | |
particles_dict = { | |
"rlnMicrographName": [], "rlnCoordinateX": [], "rlnCoordinateY": [], | |
"rlnCoordinateZ": [], "rlnAutopickFigureOfMerit": [] | |
} | |
print(f"\nConsolidating coordinates from {len(all_particle_data)} particles into a STAR file...") | |
for tomo_name, z, y, x, score in all_particle_data: | |
tomo_name_warp = re.sub(r'_\d+\.\d+Apx', '.tomostar', tomo_name) | |
particles_dict["rlnMicrographName"].append(tomo_name_warp) | |
particles_dict["rlnCoordinateX"].append(x) | |
particles_dict["rlnCoordinateY"].append(y) | |
particles_dict["rlnCoordinateZ"].append(z) | |
particles_dict["rlnAutopickFigureOfMerit"].append(score) | |
df_particles = pd.DataFrame(data=particles_dict) | |
output_path = output_dir / output_filename | |
starfile.write({'particles': df_particles}, output_path, float_format="%.4f", overwrite=True) | |
print(f"✅ Successfully saved consolidated coordinates to '{output_path}'") | |
@app.command() | |
def run( | |
score_maps: List[Path] = typer.Argument(..., help="Path to one or more score maps. Accepts glob patterns (e.g., 'scores/*.mrc')."), | |
output_dir: Path = typer.Option("picked_coords", "--out", help="Directory to save output .mod and .star files."), | |
min_distance_ang: float = typer.Option(40.0, help="Minimum distance between peaks in Angstroms (~template size)."), | |
sigma_multiplier: float = typer.Option(5.0, help="[Intelligent Mode] Multiplier for noise standard deviation to set the threshold."), | |
fixed_threshold: Optional[float] = typer.Option(None, "--fixed-threshold", help="[Manual Mode] Provide a fixed, absolute Z-score threshold, overriding the intelligent method."), | |
sphere_size: int = typer.Option(8, help="Diameter of the sphere for each peak in the output IMOD model."), | |
line_width: int = typer.Option(2, help="Thickness of the contour line for 2D views in IMOD."), | |
star_filename: str = typer.Option("particles_coords.star", help="Name for the final output STAR file.") | |
): | |
""" | |
Processes 3D cross-correlation maps to pick particle coordinates. | |
""" | |
output_dir.mkdir(exist_ok=True) | |
all_particles_for_star_file = [] | |
print(f"Found {len(score_maps)} score maps to process.") | |
for mrc_path in score_maps: | |
print(f"\n--- Processing: {mrc_path.name} ---") | |
try: | |
with mrcfile.open(mrc_path) as mrc: | |
angpix = mrc.voxel_size.x | |
min_distance_px = int(np.ceil(min_distance_ang / angpix)) | |
print(f"Opened MRC. Voxel size: {angpix:.2f} Å/px. Min distance: {min_distance_px} px.") | |
# --- Step 1: Normalization --- | |
processing_data = mrc.data.astype(np.float32) | |
global_median = np.median(processing_data) | |
mad = np.median(np.abs(processing_data - global_median)) | |
if mad == 0: | |
print("Median Absolute Deviation is zero. Cannot normalize. Skipping file.") | |
continue | |
robust_std = 1.4826 * mad | |
normalized = (processing_data - global_median) / robust_std | |
# --- Step 2: Threshold Determination --- | |
threshold = 0 | |
if fixed_threshold is not None: | |
print(f"Using user-provided fixed threshold: {fixed_threshold}") | |
threshold = fixed_threshold | |
else: | |
print(f"Determining threshold intelligently with sigma multiplier: {sigma_multiplier}") | |
hist, bin_edges = np.histogram(normalized.flatten(), bins=500, range=(-5, 5)) | |
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 | |
try: | |
p0 = [np.max(hist), bin_centers[np.argmax(hist)], 1.0] | |
params, _ = curve_fit(gaussian, bin_centers, hist, p0=p0) | |
noise_mean, noise_std = params[1], abs(params[2]) | |
threshold = noise_mean + sigma_multiplier * noise_std | |
print(f" - Fit OK. Noise µ={noise_mean:.3f}, σ={noise_std:.3f}") | |
print(f" - Calculated Threshold: {threshold:.4f}") | |
except RuntimeError: | |
print(" - Gaussian fit failed. Falling back to robust Z-score.") | |
threshold = sigma_multiplier | |
print(f" - Using Fallback Threshold: {threshold}") | |
# --- Step 3: Peak Detection --- | |
peaks_coords = peak_local_max( | |
normalized, | |
min_distance=min_distance_px, | |
threshold_abs=threshold, | |
exclude_border=True | |
) | |
num_peaks = len(peaks_coords) | |
print(f"Found {num_peaks} peaks.") | |
if num_peaks == 0: | |
print("No peaks found, so no model file will be generated for this map.") | |
continue | |
# --- Step 4: Create and Save IMOD Model --- | |
df = pd.DataFrame(peaks_coords, columns=['z', 'y', 'x']) | |
model = ImodModel.from_dataframe(df, type=ContourType.SCATTERED) | |
if model.objects: | |
obj = model.objects[0] | |
obj.header.pdrawsize = sphere_size | |
obj.header.linewidth = line_width | |
obj.color = (0.0, 1.0, 0.0) # Green | |
output_mod_path = output_dir / mrc_path.with_suffix('.mod').name | |
model.to_file(output_mod_path) | |
print(f"Successfully saved IMOD model to '{output_mod_path}'") | |
# --- Step 5: Collect data for the STAR file --- | |
peak_scores = normalized[peaks_coords[:, 0], peaks_coords[:, 1], peaks_coords[:, 2]] | |
for i in range(num_peaks): | |
z, y, x = peaks_coords[i] | |
score = peak_scores[i] | |
# Append (tomo_name, z, y, x, score) | |
all_particles_for_star_file.append((str(mrc_path), z, y, x, score)) | |
except Exception as e: | |
print(f"An error occurred while processing {mrc_path.name}: {e}") | |
continue | |
# --- Final Step: Write the consolidated STAR file --- | |
write_consolidated_star_file(output_dir, all_particles_for_star_file, star_filename) | |
if __name__ == "__main__": | |
app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment