Created
May 21, 2024 08:10
-
-
Save pansapiens/60978c8e076336e80e869b4ffa2be9e1 to your computer and use it in GitHub Desktop.
Convert miRDeep2 counts CSV to nicer TSV tables
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import pandas as pd | |
import re | |
import sys | |
from typing import Optional, List | |
import logging | |
import glob | |
import io | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
DEFAULT_SAMPLE_ID_REGEX = r"_([a-zA-Z0-9-]+)_collapsed\.csv$" | |
def extract_sample_id( | |
filename: str, | |
sample_id: Optional[str] = None, | |
sample_id_regex: Optional[str] = None, | |
) -> Optional[str]: | |
if sample_id: | |
return sample_id | |
sample_id_regex = sample_id_regex or DEFAULT_SAMPLE_ID_REGEX | |
match = re.search(sample_id_regex, filename) | |
if match: | |
return match.group(1) | |
return None | |
def read_section(lines: List[str], start_line: str) -> List[str]: | |
start_index = next( | |
(i for i, line in enumerate(lines) if line.startswith(start_line)), None | |
) | |
if start_index is None: | |
logging.error(f"Start line '{start_line}' not found in file.") | |
return [] | |
section_lines = [] | |
for line in lines[start_index + 1 :]: | |
if line.strip() == "": | |
break | |
section_lines.append(line) | |
return section_lines | |
def clean_column_names(df: pd.DataFrame) -> pd.DataFrame: | |
df.columns = df.columns.str.replace(r"[\s-]", "_", regex=True).str.lower() | |
return df | |
def combine_id_columns(df: pd.DataFrame) -> pd.DataFrame: | |
df["mirdeep2_id"] = df["provisional_id"].combine_first(df["tag_id"]) | |
df = df.drop(columns=["provisional_id", "tag_id"]) | |
return df | |
def combine_probability_columns(df: pd.DataFrame) -> pd.DataFrame: | |
prob_cols = df.filter(like="estimated_probability_that").columns | |
df["estimated_probability"] = df[prob_cols[0]].combine_first(df[prob_cols[1]]) | |
prob_values = ( | |
df["estimated_probability"] | |
.str.replace("%", "") | |
.str.replace(" ", "") | |
.str.split(r"\+/\-", expand=True) | |
) | |
df["percent_probability"] = pd.to_numeric(prob_values[0], errors="coerce") | |
df["percent_probability_ci"] = pd.to_numeric(prob_values[1], errors="coerce") | |
df = df.drop(columns=prob_cols) | |
df = df.drop(columns=["estimated_probability"]) | |
return df | |
def convert_p_value(df: pd.DataFrame) -> pd.DataFrame: | |
df["significant_randfold_p_value"] = df["significant_randfold_p_value"].map( | |
{"yes": True, "no": False} | |
) | |
return df | |
def replace_hyphens_with_na(df: pd.DataFrame) -> pd.DataFrame: | |
df.replace("-", pd.NA, inplace=True) | |
return df | |
def split_precursor_coordinate(df: pd.DataFrame) -> pd.DataFrame: | |
coord_data = df["precursor_coordinate"].str.extract( | |
r"(?P<chromosome>[^:]+):(?P<start>\d+)\.\.(?P<end>\d+):(?P<strand>[\+\-])" | |
) | |
df = pd.concat([df, coord_data], axis=1) | |
return df | |
def process_dataframe(df: pd.DataFrame) -> pd.DataFrame: | |
df = clean_column_names(df) | |
df = combine_id_columns(df) | |
df = combine_probability_columns(df) | |
df = convert_p_value(df) | |
df = replace_hyphens_with_na(df) | |
df = split_precursor_coordinate(df) | |
# Reorder columns | |
cols = list(df.columns) | |
cols.insert(0, cols.pop(cols.index("sample_id"))) | |
cols.insert(1, cols.pop(cols.index("mirdeep2_id"))) | |
cols.insert(4, cols.pop(cols.index("percent_probability"))) | |
cols.insert(5, cols.pop(cols.index("percent_probability_ci"))) | |
df = df[cols] | |
return df | |
def process_file( | |
input_file: str, | |
sample_id: Optional[str] = None, | |
sample_id_regex: Optional[str] = None, | |
) -> pd.DataFrame: | |
with open(input_file, "r") as file: | |
lines = file.readlines() | |
novel_section = read_section(lines, "novel miRNAs predicted by miRDeep2") | |
mature_section = read_section(lines, "mature miRBase miRNAs detected by miRDeep2") | |
if not novel_section and not mature_section: | |
logging.error(f"Neither section found in file: {input_file}") | |
return pd.DataFrame() | |
sample_id_value = extract_sample_id(input_file, sample_id, sample_id_regex) | |
novel_df = pd.read_csv(io.StringIO("\n".join(novel_section)), sep="\t") | |
novel_df["novel"] = True | |
mature_df = pd.read_csv(io.StringIO("\n".join(mature_section)), sep="\t") | |
mature_df["novel"] = False | |
merged_df = pd.concat([novel_df, mature_df], ignore_index=True) | |
merged_df["sample_id"] = sample_id_value | |
processed_df = process_dataframe(merged_df) | |
return processed_df | |
def main( | |
input_files: List[str], sample_id: Optional[str], sample_id_regex: Optional[str] | |
) -> None: | |
all_dfs = [] | |
for input_file in input_files: | |
merged_df = process_file(input_file, sample_id, sample_id_regex) | |
if not merged_df.empty: | |
all_dfs.append(merged_df) | |
if all_dfs: | |
final_df = pd.concat(all_dfs, ignore_index=True) | |
final_df.to_csv(sys.stdout, sep="\t", index=False) | |
else: | |
logging.error("No data to output.") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description="Extract and merge miRNA prediction sections from miRDeep2 output." | |
) | |
parser.add_argument( | |
"input_files", | |
nargs="+", | |
help="Input file paths from miRDeep2 (support for glob patterns)", | |
) | |
parser.add_argument( | |
"--sample-id", | |
help="Sample ID to override extraction from filename", | |
required=False, | |
) | |
parser.add_argument( | |
"--sample-id-regex", | |
help=f"Regex pattern to extract sample ID from filename (default: {DEFAULT_SAMPLE_ID_REGEX})", | |
default=DEFAULT_SAMPLE_ID_REGEX, | |
) | |
args = parser.parse_args() | |
expanded_files = [] | |
for pattern in args.input_files: | |
expanded_files.extend(glob.glob(pattern)) | |
main(expanded_files, args.sample_id, args.sample_id_regex) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment