Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save fomightez/b485221c9a9298771433e179b19e52ff to your computer and use it in GitHub Desktop.
Save fomightez/b485221c9a9298771433e179b19e52ff to your computer and use it in GitHub Desktop.
Data-dependent conditional execution in Snakemake
{
"cells": [
{
"cell_type": "markdown",
"id": "f7ef44c3-3167-47e4-afc2-d074415310af",
"metadata": {},
"source": [
"# Data-dependent conditional execution in Snakemake\n",
"\n",
"From documentation of ['Data-dependent conditional execution' in the Snakemake documentation](https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#data-dependent-conditional-execution) they list a 'complete example' and this is working through that in reproducible way via MyBinder in modern Snakemake.\n",
"\n",
"Came up in course of working out [MissingWildcard.ipynb](https://github.com/an-way/Sandbox/blob/main/MissingWildcard.ipynb), from https://stackoverflow.com/q/78944571/8508004 , which follows from https://stackoverflow.com/a/78893777/8508004 & https://gist.github.com/fomightez/99f9056649e1ca8128563602b2eb95f9. \n",
"\n",
"\n",
"This effort uses the same environment in MyBinder sessions launched as described in the header of [the notebook there](https://gist.github.com/fomightez/99f9056649e1ca8128563602b2eb95f9).\n",
"\n",
"\n",
"------\n",
"The complete example code block provided in the documentation section ['Data-dependent conditional execution'](https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#data-dependent-conditional-execution) (storing here as I will add additional things, plus documentation changes over time and want to be accurate in what was there at the time & how adapted):\n",
"\n",
"```python\n",
"# a target rule to define the desired final output\n",
"rule all:\n",
" input:\n",
" \"aggregated/a.txt\",\n",
" \"aggregated/b.txt\"\n",
"\n",
"rule clean:\n",
" params:\n",
" od = OUTDIR,\n",
" sd = SNDDIR\n",
" shell:\n",
" '''\n",
" touch sampleD_merged.txt\n",
" rm sample*_merged.txt\n",
" rm -rf {params.od}\n",
" rm -rf {params.sd}\n",
" '''\n",
"\n",
"\n",
"# the checkpoint that shall trigger re-evaluation of the DAG\n",
"checkpoint somestep:\n",
" input:\n",
" \"samples/{sample}.txt\"\n",
" output:\n",
" \"somestep/{sample}.txt\"\n",
" shell:\n",
" # simulate some output value\n",
" \"echo {wildcards.sample} > somestep/{wildcards.sample}.txt\"\n",
"\n",
"\n",
"# intermediate rule\n",
"rule intermediate:\n",
" input:\n",
" \"somestep/{sample}.txt\"\n",
" output:\n",
" \"post/{sample}.txt\"\n",
" shell:\n",
" \"touch {output}\"\n",
"\n",
"\n",
"# alternative intermediate rule\n",
"rule alt_intermediate:\n",
" input:\n",
" \"somestep/{sample}.txt\"\n",
" output:\n",
" \"alt/{sample}.txt\"\n",
" shell:\n",
" \"touch {output}\"\n",
"\n",
"\n",
"# input function for the rule aggregate\n",
"def aggregate_input(wildcards):\n",
" # decision based on content of output file\n",
" # Important: use the method open() of the returned file!\n",
" # This way, Snakemake is able to automatically download the file if it is generated in\n",
" # a cloud environment without a shared filesystem.\n",
" with checkpoints.somestep.get(sample=wildcards.sample).output[0].open() as f:\n",
" if f.read().strip() == \"a\":\n",
" return \"post/{sample}.txt\"\n",
" else:\n",
" return \"alt/{sample}.txt\"\n",
"\n",
"\n",
"rule aggregate:\n",
" input:\n",
" aggregate_input\n",
" output:\n",
" \"aggregated/{sample}.txt\"\n",
" shell:\n",
" \"touch {output}\"\n",
"```\n",
"\n",
"Plus about this it says:\n",
"\n",
">\"Inside the function, we first retrieve the output files of the checkpoint somestep with the wildcards, passing through the value of the wildcard sample. Upon execution, if the checkpoint is not yet complete, Snakemake will record somestep as a direct dependency of the rule aggregate. Once somestep has finished for a given sample, the input function will automatically be re-evaluated and the method get will no longer return an exception. Instead, the output file will be opened, and depending on its contents either \"post/{sample}.txt\" or \"alt/{sample}.txt\" will be returned by the input function. This way, the DAG becomes conditional on some produced data.\"\n",
"\n",
"------"
]
},
{
"cell_type": "markdown",
"id": "79c8097f-7e5f-4819-9efc-053469673747",
"metadata": {},
"source": [
"Use current version of Snakemake matching [OP's](https://stackoverflow.com/q/78944571/8508004), which is more recent than when dealing with [previous qestion](https://stackoverflow.com/a/78893777/8508004):"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2d28ba94-d826-4a5d-9017-e678c258b52f",
"metadata": {},
"outputs": [],
"source": [
"%pip install snakemake==8.18.2"
]
},
{
"cell_type": "markdown",
"id": "8e2488ea-cd9b-4162-9470-2e1813f460f4",
"metadata": {},
"source": [
"-----------"
]
},
{
"cell_type": "markdown",
"id": "4e9178d5-a50b-4a4f-8dc9-9a77c4fd6904",
"metadata": {},
"source": [
"With the environment now set up, now set to try and set up the documentation example for 'Data-dependent conditional execution' and make it better with a little additional stuff so I can reset and rerun super easily and run in MyBinder sessions for interating on developing related code.\n",
"\n",
"The code seems to expect to start off with two files, `a.txt` and `b.txt`, in a folder `samples`.\n",
"\n",
"(I know this because running the Snakefile from the default without that step gives:\n",
"\n",
"```python\n",
"Missing input files for rule somestep:\n",
" output: somestep/a.txt\n",
" wildcards: sample=a\n",
" affected files:\n",
" samples/a.txt\n",
"```\n",
")\n",
"\n",
"Let's create those files."
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "04d65894-d344-4840-a081-779d8ce22510",
"metadata": {},
"outputs": [],
"source": [
"!mkdir -p samples #`-p` flag means it will be made if not there and no issue if it already exists\n",
"!touch samples/a.txt\n",
"!touch samples/b.txt"
]
},
{
"cell_type": "markdown",
"id": "1d43a0d9-20b8-4dfd-9a59-1675ab159d34",
"metadata": {},
"source": [
"-------------\n",
"\n",
"##### Make the Snakefile"
]
},
{
"cell_type": "markdown",
"id": "2e31f94c-06d8-44b7-9b8c-f11c706257a1",
"metadata": {},
"source": [
"Note that in addition to `clean` rule, I changed names of rules and directories to be less abstract & better suit what is happening, IMO. \n",
"Otherwise these are all the same steps as in the documentation version shown above."
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "6e8a80c1-250a-48dd-b259-7844d165fbc3",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting Snakefile\n"
]
}
],
"source": [
"%%writefile Snakefile\n",
"# a target rule to define the desired final output\n",
"rule all:\n",
" input:\n",
" \"record_of_files_sorted/a_was_sorted.txt\",\n",
" \"record_of_files_sorted/b_was_sorted.txt\"\n",
"\n",
"# I, Wayne, added this clean run beyond the code provided in the documentaiton\n",
"rule clean:\n",
" params:\n",
" samples_dir = \"files_with_content\",\n",
" alt_dir = \"not_containing_letter_a\",\n",
" post_dir = \"files_containing_letter_a\",\n",
" record_dir = \"record_of_files_sorted\",\n",
" shell:\n",
" '''\n",
" rm -rf {params.samples_dir}\n",
" rm -rf {params.alt_dir}\n",
" rm -rf {params.post_dir}\n",
" rm -rf {params.record_dir}\n",
" '''\n",
"\n",
"\n",
"# the checkpoint that shall trigger re-evaluation of the DAG\n",
"checkpoint add_content_in_sample_files:\n",
" input:\n",
" \"samples/{sample}.txt\"\n",
" output:\n",
" \"files_with_content/{sample}.txt\"\n",
" shell:\n",
" # simulate some output value (here the sample name of the file is added as file content)\n",
" \"echo {wildcards.sample} > files_with_content/{wildcards.sample}.txt\"\n",
"\n",
"\n",
"# intermediate rule\n",
"rule handle_files_containing_letter_a:\n",
" input:\n",
" \"files_with_content/{sample}.txt\"\n",
" output:\n",
" \"files_containing_letter_a/{sample}.txt\"\n",
" shell:\n",
" \"touch {output}\"\n",
"\n",
"\n",
"# alternative intermediate rule\n",
"rule handle_files_WITHOUT_letter_a:\n",
" input:\n",
" \"files_with_content/{sample}.txt\"\n",
" output:\n",
" \"not_containing_letter_a/{sample}.txt\"\n",
" shell:\n",
" \"touch {output}\"\n",
"\n",
"\n",
"# input function for the rule sort_and_make_record\n",
"def sort_on_data_input(wildcards):\n",
" # decision based on content of output file\n",
" # Important: use the method open() of the returned file!\n",
" # This way, Snakemake is able to automatically download the file if it is generated in\n",
" # a cloud environment without a shared filesystem.\n",
" with checkpoints.add_content_in_sample_files.get(sample=wildcards.sample).output[0].open() as f:\n",
" if f.read().strip() == \"a\":\n",
" return \"files_containing_letter_a/{sample}.txt\"\n",
" else:\n",
" return \"not_containing_letter_a/{sample}.txt\"\n",
"\n",
"\n",
"rule sort_and_make_record:\n",
" input:\n",
" sort_on_data_input\n",
" output:\n",
" \"record_of_files_sorted/{sample}_was_sorted.txt\" \n",
" shell:\n",
" \"touch {output}\" # make the flag files detailing step carried out"
]
},
{
"cell_type": "markdown",
"id": "e125f8d9-7fec-419e-b08a-e440fb050e25",
"metadata": {},
"source": [
"-------------\n",
"\n",
"After things set up....\n",
"\n",
"#### Run the file `Snakefile`, which contains my edited version of documentation pipeline:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "c7d411bc-0329-4ea8-acc8-cef66d5270a7",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[33mAssuming unrestricted shared filesystem usage.\u001b[0m\n",
"\u001b[33mBuilding DAG of jobs...\u001b[0m\n",
"\u001b[33mUsing shell: /usr/bin/bash\u001b[0m\n",
"\u001b[33mProvided cores: 1 (use --cores to define parallelism)\u001b[0m\n",
"\u001b[33mRules claiming more threads will be scaled down.\u001b[0m\n",
"\u001b[33mJob stats:\n",
"job count\n",
"--------------------------- -------\n",
"add_content_in_sample_files 2\n",
"all 1\n",
"sort_and_make_record 2\n",
"total 5\n",
"\u001b[0m\n",
"\u001b[33mSelect jobs to execute...\u001b[0m\n",
"\u001b[33mExecute 1 jobs...\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mlocalcheckpoint add_content_in_sample_files:\n",
" input: samples/a.txt\n",
" output: files_with_content/a.txt\n",
" jobid: 2\n",
" reason: Missing output files: <TBD>\n",
" wildcards: sample=a\n",
" resources: tmpdir=/tmp\u001b[0m\n",
"\u001b[33mDAG of jobs will be updated after completion.\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mFinished job 2.\u001b[0m\n",
"\u001b[32m1 of 5 steps (20%) done\u001b[0m\n",
"\u001b[33mSelect jobs to execute...\u001b[0m\n",
"\u001b[33mExecute 1 jobs...\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mlocalrule handle_files_containing_letter_a:\n",
" input: files_with_content/a.txt\n",
" output: files_containing_letter_a/a.txt\n",
" jobid: 8\n",
" reason: Missing output files: files_containing_letter_a/a.txt\n",
" wildcards: sample=a\n",
" resources: tmpdir=/tmp\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mFinished job 8.\u001b[0m\n",
"\u001b[32m2 of 6 steps (33%) done\u001b[0m\n",
"\u001b[33mSelect jobs to execute...\u001b[0m\n",
"\u001b[33mExecute 1 jobs...\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mlocalcheckpoint add_content_in_sample_files:\n",
" input: samples/b.txt\n",
" output: files_with_content/b.txt\n",
" jobid: 4\n",
" reason: Missing output files: <TBD>\n",
" wildcards: sample=b\n",
" resources: tmpdir=/tmp\u001b[0m\n",
"\u001b[33mDAG of jobs will be updated after completion.\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mFinished job 4.\u001b[0m\n",
"\u001b[32m3 of 6 steps (50%) done\u001b[0m\n",
"\u001b[33mSelect jobs to execute...\u001b[0m\n",
"\u001b[33mExecute 1 jobs...\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mlocalrule handle_files_WITHOUT_letter_a:\n",
" input: files_with_content/b.txt\n",
" output: not_containing_letter_a/b.txt\n",
" jobid: 11\n",
" reason: Missing output files: not_containing_letter_a/b.txt\n",
" wildcards: sample=b\n",
" resources: tmpdir=/tmp\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mFinished job 11.\u001b[0m\n",
"\u001b[32m4 of 7 steps (57%) done\u001b[0m\n",
"\u001b[33mSelect jobs to execute...\u001b[0m\n",
"\u001b[33mExecute 1 jobs...\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mlocalrule sort_and_make_record:\n",
" input: not_containing_letter_a/b.txt\n",
" output: record_of_files_sorted/b_was_sorted.txt\n",
" jobid: 3\n",
" reason: Missing output files: record_of_files_sorted/b_was_sorted.txt; Input files updated by another job: not_containing_letter_a/b.txt\n",
" wildcards: sample=b\n",
" resources: tmpdir=/tmp\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mFinished job 3.\u001b[0m\n",
"\u001b[32m5 of 7 steps (71%) done\u001b[0m\n",
"\u001b[33mSelect jobs to execute...\u001b[0m\n",
"\u001b[33mExecute 1 jobs...\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mlocalrule sort_and_make_record:\n",
" input: files_containing_letter_a/a.txt\n",
" output: record_of_files_sorted/a_was_sorted.txt\n",
" jobid: 1\n",
" reason: Missing output files: record_of_files_sorted/a_was_sorted.txt; Input files updated by another job: files_containing_letter_a/a.txt\n",
" wildcards: sample=a\n",
" resources: tmpdir=/tmp\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mFinished job 1.\u001b[0m\n",
"\u001b[32m6 of 7 steps (86%) done\u001b[0m\n",
"\u001b[33mSelect jobs to execute...\u001b[0m\n",
"\u001b[33mExecute 1 jobs...\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mlocalrule all:\n",
" input: record_of_files_sorted/a_was_sorted.txt, record_of_files_sorted/b_was_sorted.txt\n",
" jobid: 0\n",
" reason: Input files updated by another job: record_of_files_sorted/b_was_sorted.txt, record_of_files_sorted/a_was_sorted.txt\n",
" resources: tmpdir=/tmp\u001b[0m\n",
"\u001b[32m\u001b[0m\n",
"\u001b[32m[Sat Sep 14 01:52:53 2024]\u001b[0m\n",
"\u001b[32mFinished job 0.\u001b[0m\n",
"\u001b[32m7 of 7 steps (100%) done\u001b[0m\n",
"\u001b[33mComplete log: .snakemake/log/2024-09-14T015253.227344.snakemake.log\u001b[0m\n"
]
}
],
"source": [
"!snakemake -c 1"
]
},
{
"cell_type": "markdown",
"id": "5aacb06d-8a5b-4b64-b464-d74ff4251844",
"metadata": {},
"source": [
"Show the directories present & files in each after Snakefile run completes:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "b8c4d68d-79f0-41af-97f5-34cc66308815",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"files_containing_letter_a/:\n",
"a.txt\n",
"\n",
"files_with_content/:\n",
"a.txt b.txt\n",
"\n",
"not_containing_letter_a/:\n",
"b.txt\n",
"\n",
"record_of_files_sorted/:\n",
"a_was_sorted.txt b_was_sorted.txt\n",
"\n",
"samples/:\n",
"a.txt b.txt\n"
]
}
],
"source": [
"ls */"
]
},
{
"cell_type": "markdown",
"id": "dc9ffef2-c23f-4d13-89da-23e0af54a7cb",
"metadata": {},
"source": [
"Sorted based on file content! \n",
"You can explore the content using the file browser pane on the left. \n",
"(Press the folder icon in the upper left, below the Jupyter symbol, if for some reason you don't see the file browser pane.)"
]
},
{
"cell_type": "markdown",
"id": "29a67633-331a-484d-8efd-70619df4fcdc",
"metadata": {},
"source": [
"------------------------------\n",
"\n",
"##### Optional/for development\n",
"\n",
"Uncomment line below and run it to reset things during development/testing."
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "1a91953d-25a1-43d7-8da3-865729935e1c",
"metadata": {},
"outputs": [],
"source": [
"#!snakemake -c 8 clean"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment