Skip to content

Instantly share code, notes, and snippets.

@genomewalker
Last active March 31, 2023 22:26
Show Gist options
  • Save genomewalker/c2b599e4be7133ca71b0a0cf9ab79ba0 to your computer and use it in GitHub Desktop.
Save genomewalker/c2b599e4be7133ca71b0a0cf9ab79ba0 to your computer and use it in GitHub Desktop.
Find LCS in read pairs
from tqdm import tqdm
from collections import defaultdict
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import Pool
import gzip
from itertools import zip_longest
from mimetypes import guess_type
from functools import partial
from cdifflib import CSequenceMatcher
import io
# from https://stackoverflow.com/questions/53751050/python-multiprocessing-understanding-logic-behind-chunksize/54032744#54032744
def calc_chunksize(n_workers, len_iterable, factor=4):
"""Calculate chunksize argument for Pool-methods.
Resembles source-code within `multiprocessing.pool.Pool._map_async`.
"""
chunksize, extra = divmod(len_iterable, n_workers * factor)
if extra:
chunksize += 1
return chunksize
# from https://groverj3.github.io/articles/2019-08-22_just-write-your-own-python-parsers-for-fastq-files.html
def parse_zip_longest(input_fastq):
encoding = guess_type(input_fastq)[1] # uses file extension
_open = partial(gzip.open, mode="rt") if encoding == "gzip" else open
with _open(input_fastq) as input_handle:
fastq_iterator = (line.rstrip() for line in input_handle)
for record in zip_longest(*[fastq_iterator] * 4):
yield record
# Define function to read FASTQ file and update shared dictionary with sequences
def update_dict(dict_, filename, progress_bar):
data = parse_zip_longest(filename)
for record in data:
seq = str(record[1])
if record[0] in dict_:
dict_[record[0]].append(seq)
else:
dict_[record[0]] = [seq]
progress_bar.update(1)
def find_lcs(seq1, seq2):
"""
Computes the longest common substring of two input strings using difflib.
"""
matcher = CSequenceMatcher(None, seq1, seq2)
match = matcher.find_longest_match(0, len(seq1), 0, len(seq2))
return seq1[match.a : match.a + match.size]
# # Compute LCS and leftover for each pair of sequences in the dictionary, in parallel
def compute_lcs_and_leftover(seq_tuple):
name = seq_tuple[0]
seq1, seq2 = seq_tuple[1]
lcs = find_lcs(seq1, seq2)
if len(lcs) == len(seq1) and len(lcs) == len(seq2):
return None, (None, None)
leftover1 = seq1.replace(lcs, "")
leftover2 = seq2.replace(lcs, "")
return name, (lcs, leftover1, leftover2)
# Define main function to read both FASTQ files in parallel and compute LCS
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Identify the LCS of two FASTQ files")
parser.add_argument(
"-1",
"--file1",
type=str,
required=True,
help="First input file in FASTQ format",
)
parser.add_argument(
"-2",
"--file2",
type=str,
required=True,
help="Second input file in FASTQ format",
)
parser.add_argument(
"-t",
"--threads",
type=int,
default=1,
help="Number of threads to use for parallel processing",
)
parser.add_argument(
"-o",
"--output",
type=str,
default="output.tsv.gz",
help="Output file name (default: output.tsv.gz)",
)
args = parser.parse_args()
# Initialize shared dictionary to store sequences
seq_dict = defaultdict(list)
print("Reading FASTQ files...")
# Initialize progress bars
progress_desc1 = f"::: Reading {args.file1}"
progress_bar1 = tqdm(total=0, desc=progress_desc1, position=0, unit=" reads")
progress_desc2 = f"::: Reading {args.file2}"
progress_bar2 = tqdm(total=0, desc=progress_desc2, position=1, unit=" reads")
# Read both FASTQ files in parallel with progress bars
with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = []
for filename, progress_bar in zip(
[args.file1, args.file2], [progress_bar1, progress_bar2]
):
futures.append(
executor.submit(update_dict, seq_dict, filename, progress_bar)
)
for future in as_completed(futures):
pass
# Close progress bars
progress_bar1.close()
progress_bar2.close()
len_all = len(seq_dict.items())
print(f"::: Total number of combined reads: {len_all:,}")
# filter elements with only one sequence
print("Removing elements with only one sequence...")
seq_dict = {k: v for k, v in seq_dict.items() if len(v) > 1}
len_filtered = len(seq_dict.items())
print(f"::: Total number of reads after filtering: {len_filtered:,}")
# Set up multiprocessing pool
# Compute LCS and leftover for each pair of sequences in the dictionary, in parallel
print("Finding read overlaps...")
c_size = calc_chunksize(args.threads, len(seq_dict.items()))
if c_size < 5000:
c_size = 5000
print(f"::: Chunksize: {c_size:,}")
result_dict = {}
total_pairs = len(seq_dict.items())
progress_bar_lcs = tqdm(
total=total_pairs,
desc="::: Comparing reads",
)
# Create a pool of worker processes
with Pool(args.threads) as p:
# Submit each task to the pool
for name, values in p.imap_unordered(
compute_lcs_and_leftover, seq_dict.items(), chunksize=c_size
):
# Add results to dictionary
if values[0] is not None:
result_dict[name] = values
# Update progress bar
progress_bar_lcs.update(1)
# Close progress bar
progress_bar_lcs.close()
print("Writing results...")
output_filename = args.output
if output_filename.endswith(".gz"):
with gzip.GzipFile(output_filename, mode="wb", compresslevel=6) as gz_file:
with io.BufferedWriter(gz_file, buffer_size=100 * (2**20)) as buffer:
buffer.write(
f"name\tlcs\tleftover_file1\tleftover_file2\n".encode("utf-8")
)
for name, values in tqdm(
result_dict.items(),
desc="::: Entries written",
total=len(result_dict),
):
buffer.write(
f"{name}\t{values[0]}\t{values[1]}\t{values[2]}\n".encode(
"utf-8"
)
)
else:
with io.BufferedWriter(
open(output_filename, mode="wb"), buffer_size=100 * (2**20)
) as buffer:
buffer.write(f"name\tlcs\tleftover_file1\tleftover_file2\n".encode("utf-8"))
for name, values in tqdm(
result_dict.items(),
desc="::: Entries written",
total=len(result_dict),
):
buffer.write(
f"{name}\t{values[0]}\t{values[1]}\t{values[2]}\n".encode("utf-8")
)
if __name__ == "__main__":
main()
name: compare-ar
channels:
- conda-forge
- bioconda
- defaults
dependencies:
- python>=3.9
- tqdm
- pip
- pip:
- cdiff
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment