Last active
March 14, 2016 22:12
-
-
Save boydgreenfield/805ac27e0a6b9a5adea7 to your computer and use it in GitHub Desktop.
Simple no-dependency Python script for concatenating two paired end FASTQ files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
No dependency Python script for joining two paired end FASTQ files. | |
Supports concatenating reads with a separator ("NNNNN") or interleaving reads via the | |
--interleave option. Auto-detects gzip'd files, offers header checking via a --strict flag, | |
and supports output to STDOUT a gzip'd FASTQ or an uncompressed FASTQ (--uncompressed flag). | |
""" | |
import argparse | |
import gzip | |
import sys | |
def enforce_headers(f1_header, f2_header): | |
if f1_header[0] != "@" or f2_header[0] != "@": | |
raise Exception("Invalid input FASTQ files.") | |
if f1_header.strip().split(" ")[0] != f2_header.strip().split(" ")[0]: | |
raise Exception("Input FASTQ files do not share headers. " | |
"Check and re-run w/o --strict.") | |
def join_w_separator(f1, f2, f_out, strict=False, sep="NNNNN"): | |
readlines = True | |
ix = 0 | |
# Use 'G', which is in most Phred scores. | |
# http://en.wikipedia.org/wiki/FASTQ_format | |
phred_sep = "G" * len(sep) | |
while readlines: | |
f1_line = f1.readline() | |
f2_line = f2.readline() | |
if f1_line == "": | |
readlines = False | |
f1.close() | |
f2.close() | |
f_out.close() | |
if f2_line != "": | |
raise Exception("Input FASTQ files do not match in length.") | |
continue | |
if ix % 4 == 0: # Header | |
if strict: # Fail if they don't match up to the first whitespace | |
enforce_headers(f1_line, f2_line) | |
# Write the header out | |
f_out.write(f1_line) | |
elif (ix - 1) % 4 == 0: # Sequence | |
f_out.write(f1_line.strip() + sep + f2_line) | |
elif (ix - 2) % 4 == 0: # Separator | |
f_out.write("+\n") | |
else: | |
f_out.write(f1_line.strip() + phred_sep + f2_line) | |
ix += 1 | |
def join_interleaved(f1, f2, f_out, strict=False): | |
readlines = True | |
ix = 0 | |
f1_lines = [] | |
f2_lines = [] | |
def flush_buffer(f_out, f1_lines, f2_lines): | |
if f1_lines and f2_lines: | |
assert len(f1_lines) == 4 | |
assert len(f2_lines) == 4 | |
f_out.write(f1_lines[0]) | |
f_out.write(f1_lines[1]) | |
f_out.write(f1_lines[2]) | |
f_out.write(f1_lines[3]) | |
f_out.write(f2_lines[0]) | |
f_out.write(f2_lines[1]) | |
f_out.write(f2_lines[2]) | |
f_out.write(f2_lines[3]) | |
f1_lines = [] | |
f2_lines = [] | |
return f1_lines, f2_lines | |
while readlines: | |
f1_line = f1.readline() | |
f2_line = f2.readline() | |
if f1_line == "": | |
readlines = False | |
if f2_line != "": | |
raise Exception("Input FASTQ files do not match in length.") | |
break | |
if ix % 4 == 0: # Header #1 | |
if strict: | |
enforce_headers(f1_line, f2_line) | |
f1_lines, f2_lines = flush_buffer(f_out, f1_lines, f2_lines) | |
# Fill buffer up to 4 lines | |
f1_lines.append(f1_line) | |
f2_lines.append(f2_line) | |
ix += 1 | |
_, _ = flush_buffer(f_out, f1_lines, f2_lines) | |
f1.close() | |
f2.close() | |
f_out.close() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description=("Join two FASTQ paired end read files. Defaults to interleaving reads. " | |
"Note: Requires input files to be sorted.")) | |
parser.add_argument("fastq1", help="First input FASTQ.") | |
parser.add_argument("fastq2", help="Second input FASTQ.") | |
parser.add_argument("output_fastq", nargs="?", | |
help=("Output FASTQ file name (optional, " | |
"streams to STDOUT if missing.")) | |
parser.add_argument("--sep", default="NNNNN", | |
help=("Optional separator to override default (NNNNN).")) | |
parser.add_argument("--no-interleave", action="store_true", | |
help="Concatenate the two reads using a separate (--sep) rather than " | |
"interleaving them (default).") | |
parser.add_argument("--strict", action="store_true", | |
help=("Enforce that the headers of the input " | |
"FASTQ files match until the first whitespace " | |
"delimiter (e.g., a space).")) | |
parser.add_argument("--gzip", action="store_true", | |
help=("Gzip output_fastq.")) | |
args = parser.parse_args() | |
if ".gz" in args.fastq1 or ".gzip" in args.fastq1: | |
f1 = gzip.open(args.fastq1, mode='r') | |
else: | |
f1 = open(args.fastq1, mode='r') | |
if ".gz" in args.fastq2 or ".gzip" in args.fastq2: | |
f2 = gzip.open(args.fastq2, mode='r') | |
else: | |
f2 = open(args.fastq2, mode='r') | |
if args.output_fastq is None: | |
f_out = sys.stdout | |
elif args.gzip: | |
f_out = gzip.open(args.output_fastq, mode='w') | |
else: | |
f_out = open(args.output_fastq, mode='w') | |
if not args.no_interleave: | |
join_interleaved(f1, f2, f_out, strict=args.strict) | |
else: | |
join_w_separator(f1, f2, f_out, strict=args.strict, sep=args.sep) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment