Last active
November 29, 2023 00:45
-
-
Save marcelm/7e432275dfa5e762b4f8 to your computer and use it in GitHub Desktop.
pure Python module that uses snakemake to construct and run a workflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Running this script is (intended to be) equivalent to running the following Snakefile: | |
include: "pipeline.conf" # Should be an empty file | |
shell.prefix("set -euo pipefail;") | |
rule all: | |
input: | |
"reads.counts" | |
rule unpack_fastq: | |
'''Unpack a FASTQ file''' | |
output: "{file}.fastq" | |
input: "{file}.fastq.gz" | |
resources: time=60, mem=100 | |
params: "{file}.params" | |
threads: 8 | |
log: 'unpack.log' | |
shell: | |
'''zcat {input} > {output} | |
echo finished 1>&2 {log} | |
''' | |
rule count: | |
'''Count reads in a FASTQ file''' | |
output: counts="{file}.counts" | |
input: fastq="{file}.fastq" | |
run: | |
n = 0 | |
with open(input.fastq) as f: | |
for _ in f: | |
n += 1 | |
with open(output.counts, 'w') as f: | |
print(n / 4, file=f) | |
""" | |
from snakemake.workflow import Workflow, Rules | |
import snakemake.workflow | |
from snakemake import shell | |
from snakemake.logging import setup_logger | |
setup_logger() | |
workflow = Workflow(__file__) | |
snakemake.workflow.rules = Rules() | |
snakemake.workflow.config = dict() | |
### Output from snakemake --print-compilation follows (reformatted) | |
workflow.include("pipeline.conf") | |
shell.prefix("set -euo pipefail;") | |
@workflow.rule(name='all', lineno=6, snakefile='.../Snakefile') | |
@workflow.input("reads.counts") | |
@workflow.norun() | |
@workflow.run | |
def __all(input, output, params, wildcards, threads, resources, log, version): | |
pass | |
@workflow.rule(name='unpack_fastq', lineno=17, snakefile='.../Snakefile') | |
@workflow.docstring("""Unpack a FASTQ file""") | |
@workflow.output("{file}.fastq") | |
@workflow.input("{file}.fastq.gz") | |
@workflow.resources(time=60, mem=100) | |
@workflow.params("{file}.params") | |
@workflow.threads(8) | |
@workflow.log('unpack.log') | |
@workflow.shellcmd( | |
"""zcat {input} > {output} | |
echo finished 1>&2 {log} | |
""" | |
) | |
@workflow.run | |
def __unpack_fastq(input, output, params, wildcards, threads, resources, log, version): | |
shell("""zcat {input} > {output} | |
echo finished 1>&2 > {log} | |
""" | |
) | |
@workflow.rule(name='count', lineno=52, snakefile='.../Snakefile') | |
@workflow.docstring("""Count reads in a FASTQ file""") | |
@workflow.output(counts = "{file}.counts") | |
@workflow.input(fastq = "{file}.fastq") | |
@workflow.run | |
def __count(input, output, params, wildcards, threads, resources, log, version): | |
n = 0 | |
with open(input.fastq) as f: | |
for _ in f: | |
n += 1 | |
with open(output.counts, 'w') as f: | |
print(n / 4, file=f) | |
### End of output from snakemake --print-compilation | |
workflow.check() | |
print("Dry run first ...") | |
workflow.execute(dryrun=True, updated_files=[]) | |
print("And now for real") | |
workflow.execute(dryrun=False, updated_files=[], resources=dict()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment