find path/to/dir -type f -name "S*.vcf.gz" > jeter.list
nextflow run --vcfs ${PWD}/jeter.list biostar9526718.nf
add -C config.cfg
toconfigure your cluster config....
/* author Pierre Lindenbaum PhD Insitut du Thorax. Nantes. France */ | |
params.vcfs = "" | |
workflow { | |
main: | |
merged_ch = mergeBcfs(params,params.vcfs) | |
} | |
workflow mergeBcfs { | |
take: | |
meta | |
vcfs | |
main: | |
contigs_ch = all_chromosomes(meta,vcfs) | |
groups_ch = split_list(meta,vcfs) | |
each_contig = contigs_ch.contigs.splitCsv(header: false,sep:'\t',strip:true).map{T->T[0]} | |
each_group = groups_ch.groups.splitCsv(header: false,sep:'\t',strip:true).map{T->T[0]} | |
group_contig_ch = merge_group_contig(meta,each_group.combine(each_contig)) | |
contig_ch = merge_contig(meta,group_contig_ch.contig_vcf.groupTuple()) | |
final_ch = merge_all(meta,contig_ch.vcf.collect()) | |
emit: | |
vcf = final_ch.vcf | |
index = final_ch.csi | |
} | |
process all_chromosomes { | |
tag "${file(vcfs).name}" | |
input: | |
val(meta) | |
val(vcfs) | |
output: | |
path("chroms.txt"),emit:contigs | |
script: | |
""" | |
set -o pipefail | |
cat ${vcfs} | while read V; do bcftools index -s "\${V}" | cut -f 1 ; done | sort -T . | uniq > chroms.txt | |
""" | |
} | |
process split_list { | |
executor "local" | |
tag "${file(vcfs).name}" | |
input: | |
val(meta) | |
val(vcfs) | |
output: | |
path("groups.list"),emit:groups | |
script: | |
def min_file_split = meta.min_file_split?:"100" | |
""" | |
SQRT=`awk 'END{X=NR;if(X<${min_file_split}){print(X);} else {z=sqrt(X); print (z==int(z)?z:int(z)+1);}}' "${vcfs}"` | |
split -a 9 --additional-suffix=.list --lines=\${SQRT} "${vcfs}" chunck. | |
find \${PWD} -type f -name "chunck.*.list" > groups.list | |
""" | |
} | |
process merge_group_contig { | |
tag "${contig} ${file(group).name}" | |
cpus 1 | |
input: | |
val(meta) | |
tuple val(group),val(contig) | |
output: | |
tuple val(contig),path("merged.0.bcf"),emit:contig_vcf | |
path("merged.0.bcf.csi"),emit:csi | |
script: | |
""" | |
bcftools merge --threads ${task.cpus} --file-list "${group}" --regions "${contig}" \ | |
-O b -o merged.0.bcf | |
bcftools index --threads ${task.cpus} merged.0.bcf | |
""" | |
} | |
process merge_contig { | |
tag "${contig} N=${L.size()}" | |
cpus 1 | |
input: | |
val(meta) | |
tuple val(contig),val(L) | |
output: | |
path("merged.1.bcf"),emit:vcf | |
path("merged.1.bcf.csi"),emit:csi | |
script: | |
""" | |
cat <<- EOF > tmp0.list | |
${L.join("\n")} | |
EOF | |
# assert files are sorted in the same sample order for each contig | |
xargs -a tmp0.list -L 1 echo | while read F ; do bcftools query -l "\${F}" |\ | |
awk -vF=\${F} '(NR==1){printf("%s,%s\\n",\$1,F);}' ; done |\ | |
sort -t ',' -T . -k1,1 | cut -d, -f 2 | uniq > tmp.list | |
bcftools merge --threads ${task.cpus} --file-list "tmp.list" --regions "${contig}" \ | |
-O b -o merged.1.bcf | |
bcftools index --threads ${task.cpus} merged.1.bcf | |
rm tmp0.list tmp.list | |
""" | |
} | |
process merge_all { | |
tag "N=${L.size()}" | |
cpus 1 | |
input: | |
val(meta) | |
val(L) | |
output: | |
path("merged.bcf"),emit:vcf | |
path("merged.bcf.csi"),emit:csi | |
script: | |
""" | |
cat <<- EOF > tmp.list | |
${L.join("\n")} | |
EOF | |
bcftools concat --no-version \ | |
--allow-overlaps --remove-duplicates \ | |
-O b -o "merged.bcf" --file-list tmp.list | |
bcftools index --threads ${task.cpus} merged.bcf | |
rm tmp.list | |
""" | |
} |