Last active
March 31, 2023 22:26
-
-
Save genomewalker/c2b599e4be7133ca71b0a0cf9ab79ba0 to your computer and use it in GitHub Desktop.
Find LCS in read pairs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tqdm import tqdm | |
from collections import defaultdict | |
import argparse | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from multiprocessing import Pool | |
import gzip | |
from itertools import zip_longest | |
from mimetypes import guess_type | |
from functools import partial | |
from cdifflib import CSequenceMatcher | |
import io | |
# from https://stackoverflow.com/questions/53751050/python-multiprocessing-understanding-logic-behind-chunksize/54032744#54032744 | |
def calc_chunksize(n_workers, len_iterable, factor=4): | |
"""Calculate chunksize argument for Pool-methods. | |
Resembles source-code within `multiprocessing.pool.Pool._map_async`. | |
""" | |
chunksize, extra = divmod(len_iterable, n_workers * factor) | |
if extra: | |
chunksize += 1 | |
return chunksize | |
# from https://groverj3.github.io/articles/2019-08-22_just-write-your-own-python-parsers-for-fastq-files.html | |
def parse_zip_longest(input_fastq): | |
encoding = guess_type(input_fastq)[1] # uses file extension | |
_open = partial(gzip.open, mode="rt") if encoding == "gzip" else open | |
with _open(input_fastq) as input_handle: | |
fastq_iterator = (line.rstrip() for line in input_handle) | |
for record in zip_longest(*[fastq_iterator] * 4): | |
yield record | |
# Define function to read FASTQ file and update shared dictionary with sequences | |
def update_dict(dict_, filename, progress_bar): | |
data = parse_zip_longest(filename) | |
for record in data: | |
seq = str(record[1]) | |
if record[0] in dict_: | |
dict_[record[0]].append(seq) | |
else: | |
dict_[record[0]] = [seq] | |
progress_bar.update(1) | |
def find_lcs(seq1, seq2): | |
""" | |
Computes the longest common substring of two input strings using difflib. | |
""" | |
matcher = CSequenceMatcher(None, seq1, seq2) | |
match = matcher.find_longest_match(0, len(seq1), 0, len(seq2)) | |
return seq1[match.a : match.a + match.size] | |
# # Compute LCS and leftover for each pair of sequences in the dictionary, in parallel | |
def compute_lcs_and_leftover(seq_tuple): | |
name = seq_tuple[0] | |
seq1, seq2 = seq_tuple[1] | |
lcs = find_lcs(seq1, seq2) | |
if len(lcs) == len(seq1) and len(lcs) == len(seq2): | |
return None, (None, None) | |
leftover1 = seq1.replace(lcs, "") | |
leftover2 = seq2.replace(lcs, "") | |
return name, (lcs, leftover1, leftover2) | |
# Define main function to read both FASTQ files in parallel and compute LCS | |
def main(): | |
# Parse command-line arguments | |
parser = argparse.ArgumentParser(description="Identify the LCS of two FASTQ files") | |
parser.add_argument( | |
"-1", | |
"--file1", | |
type=str, | |
required=True, | |
help="First input file in FASTQ format", | |
) | |
parser.add_argument( | |
"-2", | |
"--file2", | |
type=str, | |
required=True, | |
help="Second input file in FASTQ format", | |
) | |
parser.add_argument( | |
"-t", | |
"--threads", | |
type=int, | |
default=1, | |
help="Number of threads to use for parallel processing", | |
) | |
parser.add_argument( | |
"-o", | |
"--output", | |
type=str, | |
default="output.tsv.gz", | |
help="Output file name (default: output.tsv.gz)", | |
) | |
args = parser.parse_args() | |
# Initialize shared dictionary to store sequences | |
seq_dict = defaultdict(list) | |
print("Reading FASTQ files...") | |
# Initialize progress bars | |
progress_desc1 = f"::: Reading {args.file1}" | |
progress_bar1 = tqdm(total=0, desc=progress_desc1, position=0, unit=" reads") | |
progress_desc2 = f"::: Reading {args.file2}" | |
progress_bar2 = tqdm(total=0, desc=progress_desc2, position=1, unit=" reads") | |
# Read both FASTQ files in parallel with progress bars | |
with ThreadPoolExecutor(max_workers=args.threads) as executor: | |
futures = [] | |
for filename, progress_bar in zip( | |
[args.file1, args.file2], [progress_bar1, progress_bar2] | |
): | |
futures.append( | |
executor.submit(update_dict, seq_dict, filename, progress_bar) | |
) | |
for future in as_completed(futures): | |
pass | |
# Close progress bars | |
progress_bar1.close() | |
progress_bar2.close() | |
len_all = len(seq_dict.items()) | |
print(f"::: Total number of combined reads: {len_all:,}") | |
# filter elements with only one sequence | |
print("Removing elements with only one sequence...") | |
seq_dict = {k: v for k, v in seq_dict.items() if len(v) > 1} | |
len_filtered = len(seq_dict.items()) | |
print(f"::: Total number of reads after filtering: {len_filtered:,}") | |
# Set up multiprocessing pool | |
# Compute LCS and leftover for each pair of sequences in the dictionary, in parallel | |
print("Finding read overlaps...") | |
c_size = calc_chunksize(args.threads, len(seq_dict.items())) | |
if c_size < 5000: | |
c_size = 5000 | |
print(f"::: Chunksize: {c_size:,}") | |
result_dict = {} | |
total_pairs = len(seq_dict.items()) | |
progress_bar_lcs = tqdm( | |
total=total_pairs, | |
desc="::: Comparing reads", | |
) | |
# Create a pool of worker processes | |
with Pool(args.threads) as p: | |
# Submit each task to the pool | |
for name, values in p.imap_unordered( | |
compute_lcs_and_leftover, seq_dict.items(), chunksize=c_size | |
): | |
# Add results to dictionary | |
if values[0] is not None: | |
result_dict[name] = values | |
# Update progress bar | |
progress_bar_lcs.update(1) | |
# Close progress bar | |
progress_bar_lcs.close() | |
print("Writing results...") | |
output_filename = args.output | |
if output_filename.endswith(".gz"): | |
with gzip.GzipFile(output_filename, mode="wb", compresslevel=6) as gz_file: | |
with io.BufferedWriter(gz_file, buffer_size=100 * (2**20)) as buffer: | |
buffer.write( | |
f"name\tlcs\tleftover_file1\tleftover_file2\n".encode("utf-8") | |
) | |
for name, values in tqdm( | |
result_dict.items(), | |
desc="::: Entries written", | |
total=len(result_dict), | |
): | |
buffer.write( | |
f"{name}\t{values[0]}\t{values[1]}\t{values[2]}\n".encode( | |
"utf-8" | |
) | |
) | |
else: | |
with io.BufferedWriter( | |
open(output_filename, mode="wb"), buffer_size=100 * (2**20) | |
) as buffer: | |
buffer.write(f"name\tlcs\tleftover_file1\tleftover_file2\n".encode("utf-8")) | |
for name, values in tqdm( | |
result_dict.items(), | |
desc="::: Entries written", | |
total=len(result_dict), | |
): | |
buffer.write( | |
f"{name}\t{values[0]}\t{values[1]}\t{values[2]}\n".encode("utf-8") | |
) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: compare-ar | |
channels: | |
- conda-forge | |
- bioconda | |
- defaults | |
dependencies: | |
- python>=3.9 | |
- tqdm | |
- pip | |
- pip: | |
- cdiff |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment