find path/to/dir -type f -name "S*.vcf.gz" > jeter.list
nextflow run --vcfs ${PWD}/jeter.list biostar9526718.nf
add -C config.cfg toconfigure your cluster config....
| /* author Pierre Lindenbaum PhD Insitut du Thorax. Nantes. France */ | |
| params.vcfs = "" | |
| workflow { | |
| main: | |
| merged_ch = mergeBcfs(params,params.vcfs) | |
| } | |
| workflow mergeBcfs { | |
| take: | |
| meta | |
| vcfs | |
| main: | |
| contigs_ch = all_chromosomes(meta,vcfs) | |
| groups_ch = split_list(meta,vcfs) | |
| each_contig = contigs_ch.contigs.splitCsv(header: false,sep:'\t',strip:true).map{T->T[0]} | |
| each_group = groups_ch.groups.splitCsv(header: false,sep:'\t',strip:true).map{T->T[0]} | |
| group_contig_ch = merge_group_contig(meta,each_group.combine(each_contig)) | |
| contig_ch = merge_contig(meta,group_contig_ch.contig_vcf.groupTuple()) | |
| final_ch = merge_all(meta,contig_ch.vcf.collect()) | |
| emit: | |
| vcf = final_ch.vcf | |
| index = final_ch.csi | |
| } | |
| process all_chromosomes { | |
| tag "${file(vcfs).name}" | |
| input: | |
| val(meta) | |
| val(vcfs) | |
| output: | |
| path("chroms.txt"),emit:contigs | |
| script: | |
| """ | |
| set -o pipefail | |
| cat ${vcfs} | while read V; do bcftools index -s "\${V}" | cut -f 1 ; done | sort -T . | uniq > chroms.txt | |
| """ | |
| } | |
| process split_list { | |
| executor "local" | |
| tag "${file(vcfs).name}" | |
| input: | |
| val(meta) | |
| val(vcfs) | |
| output: | |
| path("groups.list"),emit:groups | |
| script: | |
| def min_file_split = meta.min_file_split?:"100" | |
| """ | |
| SQRT=`awk 'END{X=NR;if(X<${min_file_split}){print(X);} else {z=sqrt(X); print (z==int(z)?z:int(z)+1);}}' "${vcfs}"` | |
| split -a 9 --additional-suffix=.list --lines=\${SQRT} "${vcfs}" chunck. | |
| find \${PWD} -type f -name "chunck.*.list" > groups.list | |
| """ | |
| } | |
| process merge_group_contig { | |
| tag "${contig} ${file(group).name}" | |
| cpus 1 | |
| input: | |
| val(meta) | |
| tuple val(group),val(contig) | |
| output: | |
| tuple val(contig),path("merged.0.bcf"),emit:contig_vcf | |
| path("merged.0.bcf.csi"),emit:csi | |
| script: | |
| """ | |
| bcftools merge --threads ${task.cpus} --file-list "${group}" --regions "${contig}" \ | |
| -O b -o merged.0.bcf | |
| bcftools index --threads ${task.cpus} merged.0.bcf | |
| """ | |
| } | |
| process merge_contig { | |
| tag "${contig} N=${L.size()}" | |
| cpus 1 | |
| input: | |
| val(meta) | |
| tuple val(contig),val(L) | |
| output: | |
| path("merged.1.bcf"),emit:vcf | |
| path("merged.1.bcf.csi"),emit:csi | |
| script: | |
| """ | |
| cat <<- EOF > tmp0.list | |
| ${L.join("\n")} | |
| EOF | |
| # assert files are sorted in the same sample order for each contig | |
| xargs -a tmp0.list -L 1 echo | while read F ; do bcftools query -l "\${F}" |\ | |
| awk -vF=\${F} '(NR==1){printf("%s,%s\\n",\$1,F);}' ; done |\ | |
| sort -t ',' -T . -k1,1 | cut -d, -f 2 | uniq > tmp.list | |
| bcftools merge --threads ${task.cpus} --file-list "tmp.list" --regions "${contig}" \ | |
| -O b -o merged.1.bcf | |
| bcftools index --threads ${task.cpus} merged.1.bcf | |
| rm tmp0.list tmp.list | |
| """ | |
| } | |
| process merge_all { | |
| tag "N=${L.size()}" | |
| cpus 1 | |
| input: | |
| val(meta) | |
| val(L) | |
| output: | |
| path("merged.bcf"),emit:vcf | |
| path("merged.bcf.csi"),emit:csi | |
| script: | |
| """ | |
| cat <<- EOF > tmp.list | |
| ${L.join("\n")} | |
| EOF | |
| bcftools concat --no-version \ | |
| --allow-overlaps --remove-duplicates \ | |
| -O b -o "merged.bcf" --file-list tmp.list | |
| bcftools index --threads ${task.cpus} merged.bcf | |
| rm tmp.list | |
| """ | |
| } |