Created
November 15, 2020 06:24
-
-
Save rpetit3/fe1f5428be135852ec90bfb63aa32c93 to your computer and use it in GitHub Desktop.
bactopia-wrapper-nextflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env nextflow | |
import groovy.json.JsonSlurper | |
import groovy.text.SimpleTemplateEngine | |
import groovy.util.FileNameByRegexFinder | |
import java.nio.file.Path | |
import java.nio.file.Paths | |
import nextflow.util.SysHelper | |
PROGRAM_NAME = workflow.manifest.name | |
VERSION = workflow.manifest.version | |
// Setup output directories | |
datasets = params.datasets ? params.datasets : '/home/ec2-user/datasets' | |
bactopia = params.bactopia ? params.bactopia : '/home/ec2-user/bactopia' | |
conda = params.conda ? params.conda : '/home/ec2-user/bactopia-envs' | |
outdir = params.outdir ? params.outdir : './' | |
SERVERS = ['olaf', 'loma'] | |
// Setup some defaults | |
log.info "${PROGRAM_NAME} - ${VERSION}" | |
process bactopia { | |
/* Gather up input FASTQs for analysis. */ | |
publishDir "${outdir}/${server}/complete/", mode: "${params.publish_mode}", overwrite: params.overwrite, pattern: "${accession}-complete.tar.gz" | |
publishDir "${outdir}/${server}/incomplete/", mode: "${params.publish_mode}", overwrite: params.overwrite, pattern: "${accession}-incomplete.tar.gz" | |
tag "${accession}" | |
input: | |
set val(accession), val(server) from create_input_channel() | |
output: | |
file "${accession}-incomplete.tar.gz" optional true | |
file "${accession}-complete.tar.gz" optional true | |
shell: | |
allow_failed = false | |
if (task.attempt == 3) { | |
allow_failed = true | |
} | |
opts = "${bactopia}/main.nf -c ${bactopia}/nextflow.config --accession ${accession} --datasets ${datasets} --condadir ${conda} --species 'Staphylococcus aureus' --genome_size median -qs ${task.cpus}" | |
""" | |
mkdir !{accession}-run | |
cd !{accession}-run | |
if nextflow run !{opts}; then | |
# Everything finished, pack up the results and clean up | |
mkdir complete/ | |
mv bactopia-info/ !{accession}/ complete/ | |
tar -cf - complete/ | pigz -n --best -p !{task.cpus} > !{accession}-complete.tar.gz | |
mv !{accession}-complete.tar.gz ../ | |
else | |
# Run failed | |
if [[ "!{allow_failed}" == 'true' ]]; then | |
# We've tried enough times, pack it up | |
mkdir incomplete | |
mv .nextflow* work/* bactopia-info/ !{accession}/ incomplete/ | |
tar -cf - incomplete/ | pigz -n --best -p !{task.cpus} > !{accession}-incomplete.tar.gz | |
mv !{accession}-incomplete.tar.gz ../ | |
else | |
rm -rf !{accession}-run | |
exit 1 | |
fi | |
fi | |
cd .. | |
rm -rf !{accession}-run | |
""" | |
} | |
workflow.onComplete { | |
workDir = new File("${workflow.workDir}") | |
workDirSize = toHumanString(workDir.directorySize()) | |
println """ | |
Bactopia Execution Summary | |
--------------------------- | |
Command Line : ${workflow.commandLine} | |
Resumed : ${workflow.resume} | |
Completed At : ${workflow.complete} | |
Duration : ${workflow.duration} | |
Success : ${workflow.success} | |
Exit Code : ${workflow.exitStatus} | |
Error Report : ${workflow.errorReport ?: '-'} | |
Launch Dir : ${workflow.launchDir} | |
Working Dir : ${workflow.workDir} (Total Size: ${workDirSize}) | |
Working Dir Size: ${workDirSize} | |
""" | |
} | |
// Utility functions | |
def toHumanString(bytes) { | |
// Thanks Niklaus | |
// https://gist.github.com/nikbucher/9687112 | |
base = 1024L | |
decimals = 3 | |
prefix = ['', 'K', 'M', 'G', 'T'] | |
int i = Math.log(bytes)/Math.log(base) as Integer | |
i = (i >= prefix.size() ? prefix.size()-1 : i) | |
return Math.round((bytes / base**i) * 10**decimals) / 10**decimals + prefix[i] | |
} | |
def print_version() { | |
println(PROGRAM_NAME + ' ' + VERSION) | |
exit 0 | |
} | |
def process_accessions(accession) { | |
/* Parse line and determine if single end or paired reads*/ | |
if (accession.length() > 0) { | |
Random r = new Random() | |
server = SERVERS[r.nextInt(SERVERS.size)] | |
return tuple(accession, server) | |
} | |
} | |
def create_input_channel() { | |
if (params.accession) { | |
return process_accessions(params.accession) | |
} else { | |
return Channel.fromPath( file(params.accessions) ) | |
.splitText() | |
.map { line -> process_accessions(line.trim()) } | |
} | |
} | |
def print_usage() { | |
usage_text = params.help_all ? full_help() : basic_help() | |
log.info""" | |
${PROGRAM_NAME} v${VERSION} | |
${basic_help()} | |
""".stripIndent() | |
if (params.conda_help) { | |
// Cleanup up the directory | |
// This is only meant to be used with tests for conda build | |
file("./work/").deleteDir() | |
file("./.nextflow/").deleteDir() | |
def files = new FileNameByRegexFinder().getFileNames('./', '.nextflow.log*') | |
files.each { new File(it).delete()} | |
} | |
exit 0 | |
} | |
def basic_help() { | |
genome_size = params.genome_size ? params.genome_size : "Mash Estimate" | |
return """ | |
Required Parameters: | |
### For Procesessing Multiple Samples | |
--fastqs STR An input file containing the sample name and | |
absolute paths to FASTQ/FASTAs to process | |
### For Processing A Single Sample | |
--R1 STR First set of reads for paired end in compressed (gzip) | |
FASTQ format | |
--R2 STR Second set of reads for paired end in compressed (gzip) | |
FASTQ format | |
--SE STR Single end set of reads in compressed (gzip) FASTQ format | |
--hybrid The SE should be treated as long reads for hybrid assembly. | |
--sample STR The name of the input sequences | |
### For Downloading from SRA/ENA or NCBI Assembly | |
**Note: Assemblies will have error free Illumina reads simulated for processing.** | |
--accessions An input file containing ENA/SRA Experiment accessions or | |
NCBI Assembly accessions to be processed | |
--accession A single ENA/SRA Experiment accession or NCBI Assembly accession | |
to be processed | |
### For Processing an Assembly | |
**Note: The assembly will have error free Illumina reads simulated for processing.** | |
--assembly STR A assembled genome in compressed FASTA format. | |
--reassemble The simulated reads will be used to create a new assembly. | |
Default: Use the original assembly, do not reassemble | |
Dataset Parameters: | |
--datasets DIR The path to available datasets that have | |
already been set up | |
--species STR Determines which species-specific dataset to | |
use for the input sequencing | |
Optional Parameters: | |
--coverage INT Reduce samples to a given coverage | |
Default: ${params.coverage}x | |
--genome_size INT Expected genome size (bp) for all samples, a value of '0' | |
will disable read error correction and read subsampling. | |
Special values (requires --species): | |
'min': uses minimum completed genome size of species | |
'median': uses median completed genome size of species | |
'mean': uses mean completed genome size of species | |
'max': uses max completed genome size of species | |
Default: Mash estimate | |
--outdir DIR Directory to write results to | |
Default: ${params.outdir} | |
Nextflow Queue Parameters: | |
At execution, Nextflow creates a queue and the number of slots in the queue is determined by the total number | |
of cores on the system. When a task is submitted to the queue, the total number of slots it occupies is | |
determined by the value set by "--cpus". | |
This can have a significant effect on the efficiency of the Nextflow's queue system. If "--cpus" is set to a | |
value that is equal to the number of cores availabe, in most cases only a single task will be able to run | |
because its occupying all available slots. | |
When in doubt, "--cpus 4" is a safe bet, it is also the default value if you don't use "--cpus". | |
--max_time INT The maximum number of minutes a single task should run before being halted. | |
Default: ${params.max_time} minutes | |
--max_memory INT The maximum amount of memory (Gb) allowed to a single task. | |
Default: ${params.max_memory} Gb | |
--cpus INT Number of processors made available to a single task. | |
Default: ${params.cpus} | |
-qs Nextflow queue size. This parameter is very useful to limit the total number of | |
processors used on desktops, laptops or shared resources. | |
Default: Nextflow defaults to the total number of processors on your system. | |
Nextflow Related Parameters: | |
--infodir DIR Directory to write Nextflow summary files to | |
Default: ${params.infodir} | |
--condadir DIR Directory to Nextflow should use for Conda environments | |
Default: Bactopia's Nextflow directory | |
--nfconfig STR A Nextflow compatible config file for custom profiles. This allows | |
you to create profiles specific to your environment (e.g. SGE, | |
AWS, SLURM, etc...). This config file is loaded last and will | |
overwrite existing variables if set. | |
Default: Bactopia's default configs | |
--nfdir Print directory Nextflow has pulled Bactopia to | |
--overwrite Nextflow will overwrite existing output files. | |
Default: ${params.overwrite} | |
--conatainerPath Path to Singularity containers to be used by the 'slurm' | |
profile. | |
Default: ${params.containerPath} | |
--sleep_time After reading datases, the amount of time (seconds) Nextflow | |
will wait before execution. | |
Default: ${params.sleep_time} seconds | |
--publish_mode Set Nextflow's method for publishing output files. Allowed methods are: | |
'copy' (default) Copies the output files into the published directory. | |
'copyNoFollow' Copies the output files into the published directory | |
without following symlinks ie. copies the links themselves. | |
'link' Creates a hard link in the published directory for each | |
process output file. | |
'rellink' Creates a relative symbolic link in the published directory | |
for each process output file. | |
'symlink' Creates an absolute symbolic link in the published directory | |
for each process output file. | |
Default: ${params.publish_mode} | |
--force Nextflow will overwrite existing output files. | |
Default: ${params.force} | |
-resume Nextflow will attempt to resume a previous run. Please notice it is | |
only a single '-' | |
--cleanup_workdir After Bactopia is successfully executed, the work firectory will be deleted. | |
Warning: by doing this you lose the ability to resume workflows. | |
Useful Parameters: | |
--skip_logs Logs for each process per sample will not be kept. | |
--available_datasets Print a list of available datasets found based | |
on location given by "--datasets" | |
--example_fastqs Print example of expected input for FASTQs file | |
--check_fastqs Verify "--fastqs" produces the expected inputs | |
--compress Compress (gzip) select outputs (e.g. annotation, variant calls) | |
to reduce overall storage footprint. | |
--keep_all_files Keeps all analysis files created. By default, intermediate | |
files are removed. This will not affect the ability | |
to resume Nextflow runs, and only occurs at the end | |
of the process. | |
--version Print workflow version information | |
--help Show this message and exit | |
--help_all Show a complete list of adjustable parameters | |
""" | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// main script name | |
manifest { | |
author = 'Robert A. Petit III' | |
name = 'bactopia-wrapper' | |
homePage = 'https://github.com/bactopia/bactopia' | |
description = 'A wrapper for Bactopia to be used with AWS Batch.' | |
mainScript = 'main.nf' | |
version = '1.4.11' | |
nextflowVersion = '>=19' | |
} | |
// Container/Conda env version | |
container_version = '1.4.x' | |
// Default parameters | |
params { | |
// Bactopia | |
accession = null | |
accessions = null | |
bactopia = '/home/ec2-user/bactopia' | |
datasets = '/home/ec2-user/datasets' | |
conda = '/home/ec2-user/bactopia-envs' | |
publish_mode = 'copy' | |
overwrite = false | |
cpus = 4 | |
outdir = './' | |
infodir = './' | |
help = null | |
version = null | |
} | |
process { | |
// Defaults | |
cpus = {params.cpus * task.attempt} | |
memory = {16.GB * task.attempt} | |
time = {90.m * task.attempt} | |
errorStrategy = 'retry' | |
maxRetries = 5 | |
} | |
profiles { | |
standard {} | |
awsbatch { | |
executor { | |
name = 'awsbatch' | |
awscli = '/home/ec2-user/miniconda/bin/aws' | |
queueSize = 500 | |
} | |
aws { | |
region = 'us-east-1' | |
client { | |
uploadStorageClass = 'STANDARD_IA' | |
} | |
batch { | |
cliPath = '/home/ec2-user/miniconda3/bin/aws' | |
maxParallelTransfers = 8 | |
delayBetweenAttempts = 15 | |
maxTransferAttempts = 5 | |
volumes = ['/home/ec2-user/bactopia', '/home/ec2-user/datasets', '/home/ec2-user/bactopia-envs'] | |
} | |
} | |
process { | |
executor = 'awsbatch' | |
queue = 'nf-batch-spot' | |
container = "bactopia/aws-bactopia:${container_version}" | |
} | |
docker { | |
enabled = true | |
runOptions = '-u $(id -u):$(id -g)' | |
} | |
} | |
} | |
// Reporting configuration | |
timeline { | |
enabled = true | |
file = "${params.infodir}/bactopia-info/bactopia-timeline.html" | |
} | |
report { | |
enabled = true | |
file = "${params.infodir}/bactopia-info/bactopia-report.html" | |
} | |
trace { | |
enabled = true | |
file = "${params.infodir }/bactopia-info/bactopia-trace.txt" | |
fields = 'task_id,hash,native_id,process,tag,name,status,exit,module,container,cpus,time,disk,memory,attempt,start,complete,duration,realtime,queue,%cpu,%mem,rss,vmem' | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment