Last active
November 26, 2019 01:06
-
-
Save mlin/5ac534d6655a687f0e35586c25b64c8a to your computer and use it in GitHub Desktop.
Split large VCF for parallel load into Spark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version 1.0 | |
task split_vcf_for_spark { | |
# Quickly split a large .vcf.gz file into a specified number of compressed partitions. | |
# | |
# Motivation: calling SparkContext.textFile on a single large vcf.gz can be painfully slow, | |
# because it's decompressed and parsed in ~1 thread. Use this to first split it up (with a | |
# faster multithreaded pipeline); then tell Spark to parallel load the data using textFile on a | |
# glob pattern. | |
# | |
# To maximize parallelism, the input VCF lines are dealt round-robin around to the partition | |
# files; so each partition is internally sorted, but the Spark RDD/DataFrame would need to be | |
# re-sorted if the original total ordering is desired. | |
# | |
# The VCF header is output in a separate, uncompressed text file. | |
input { | |
File vcf_gz | |
Int partitions = 16 | |
String name = basename(vcf_gz, ".vcf.gz") | |
Int cpu = 16 | |
} | |
command <<< | |
set -eux | |
export LC_ALL=C | |
apt-get -qq update && apt-get -qq -y install pv tabix | |
# assume header text is less than 4MB | |
mkdir partitions | |
bgzip -dc "~{vcf_gz}" | head -c 4194304 | grep ^\# > "partitions/~{name}._HEADER" | |
set -o pipefail | |
bgzip -dc@ 4 "~{vcf_gz}" | pv -abtfi 10 | \ | |
split -n r/~{partitions} --numeric-suffixes=1 -a $(expr length ~{partitions}) -u \ | |
--additional-suffix=of~{partitions}.vcf.gz \ | |
--filter='grep -v ^\# | bgzip -c -l 1 > $FILE' - "partitions/~{name}." | |
# problems with other ways of using split: | |
# -n l/N: needs uncompressed input file | |
# --lines: need total # of lines to control # output partitions | |
# --line-bytes: liable to cut up individual lines longer than SIZE | |
# also, these recompress the partitions one-at-a-time which is slower. | |
>>> | |
output { | |
File header = glob("partitions/*._HEADER")[0] | |
Array[File] partitions_vcf_gz = glob("partitions/*.vcf.gz") | |
} | |
runtime { | |
cpu: cpu | |
memory: "~{2+(if partitions >= 16 then partitions/8 else 2)}G" | |
# need recent version bgzip supporting multithreading and libdeflate | |
docker: "ubuntu:19.10" | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version 1.0 | |
import "split_vcf_for_spark.wdl" as tasks | |
workflow test { | |
input { | |
File? test_vcf_gz | |
String test_vcf_url = "https://1000genomes.s3.amazonaws.com/release/20130502/ALL.chr22.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz" | |
} | |
if (!defined(test_vcf_gz)) { | |
call aria2c { | |
input: | |
url = test_vcf_url | |
} | |
} | |
File eff_test_vcf_gz = select_first([test_vcf_gz, aria2c.file]) | |
call tasks.split_vcf_for_spark { | |
input: | |
vcf_gz = eff_test_vcf_gz, | |
partitions = 64 | |
} | |
call validate { | |
input: | |
vcf_gz = eff_test_vcf_gz, | |
header = split_vcf_for_spark.header, | |
partitions_vcf_gz = split_vcf_for_spark.partitions_vcf_gz | |
} | |
} | |
task aria2c { | |
input { | |
String url | |
Int connections = 10 | |
} | |
command <<< | |
set -euxo pipefail | |
mkdir __out | |
cd __out | |
aria2c -x ~{connections} -j ~{connections} -s ~{connections} \ | |
--file-allocation=none --retry-wait=2 --stderr=true \ | |
"~{url}" | |
>>> | |
output { | |
File file = glob("__out/*")[0] | |
} | |
runtime { | |
docker: "hobbsau/aria2" | |
} | |
} | |
task validate { | |
input { | |
File vcf_gz | |
File header | |
Array[File] partitions_vcf_gz | |
} | |
command <<< | |
set -eux | |
export LC_ALL=C | |
# compare the headers | |
gzip -dc "~{vcf_gz}" | head -c 4194304 | grep ^\# > input_header | |
diff input_header "~{header}" 2>&1 | |
# digest the uncompressed contents of vcf_gz | |
set -o pipefail | |
gzip -dc "~{vcf_gz}" | grep -v ^\# | sort | sha256sum > input_digest & pid=$! | |
# "unsplit" the parts and digest them | |
cat "~{write_lines(partitions_vcf_gz)}" | xargs -n 999999 gzip -dc | sort | sha256sum > output_digest | |
# compare the digests | |
wait $pid | |
diff input_digest output_digest 2>&1 | |
>>> | |
runtime { | |
docker: "ubuntu:19.04" | |
cpu: 8 | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment