Skip to content

Instantly share code, notes, and snippets.

@danielecook
Last active April 22, 2020 15:58
Show Gist options
  • Save danielecook/7fae638ca7745efae4358d0584483255 to your computer and use it in GitHub Desktop.
Save danielecook/7fae638ca7745efae4358d0584483255 to your computer and use it in GitHub Desktop.
Parse nextflow output with -dump-hashes enabled for comparison #nextflow
#!/usr/bin/env python
import sys
import re
import io
import pandas as pd
import numpy as np
HELP_MSG = """
diff hashes
Author: Daniel Cook
This program cleans up output from a nextflow run with the -dump-hashes option enabled.
It is designed to help debug where caching issues occur with nextflow.
Usage:
# Run nextflow with -dump-hashes
nextflow run main.nf -resume -dump-hashes > log1.txt
nextflow run main.nf -resume -dump-hashes > log2.txt
python diff_hashes.py log1.txt log2.txt
"""
def generate_csv(fnames):
yield output_line("filename", "process", "process_hash", "value_hash", "value_type", "value")
for fname in fnames:
last_cache_line = 0
output_last = False
value_open = False
value = None
value_hash = None
value_type = None
process_name = None
with open(fname, 'r') as f:
for n, line in enumerate(f):
if "cache hash:" in line:
cloc = line.find("cache hash:")
process_name = line[1:cloc-2]
process_cache_hash = line[cloc + 12:cloc + 12 + 32]
output_last = True
cache_line = re.match(r"^\W+([a-f0-9]{32}) \[([^\\]+?)\] (.*)", line)
# Check whether new cache line is +1
line_diff = n - last_cache_line
if (cache_line and line_diff == 1) or output_last and value:
yield output_line(fname, process_name, process_cache_hash, value_hash, value_type, repr(value.strip()))
value_open = False
output_last = False
# Then process the next cache line
if cache_line:
value_hash = cache_line.group(1)
value_type = cache_line.group(2)
value = cache_line.group(3).strip()
value_open = True
last_cache_line = n
elif line.strip("\n\r") == " ":
# If a line is a single space appears to be the only way in which
# the end of a value is signalled...for now.
value_open = False
output_line(fname, process_name, process_cache_hash, value_hash, value_type, repr(value.strip()))
elif value_open:
value += repr(line)
yield output_line(fname, process_name, process_cache_hash, value_hash, value_type, repr(value.strip()))
def output_line(*k):
return "\t".join(k)
if __name__ == "__main__":
if len(sys.argv) == 1:
print(HELP_MSG)
raw = io.StringIO('\n'.join(generate_csv(sys.argv[1:3])))
df = pd.read_csv(raw, sep="\t")
# Now add a group index to every process
# First drop absolute duplicates; These are not helpful.
#df = df[df.duplicated(subset=['process', 'process_hash', 'value_hash', 'value_type']) == False]
# Filter for processes present in both hash files
df = df.groupby(by=["process"]).filter(lambda x: len(x["filename"].unique()) == 2)
# Filter value hashes present in both processes
df = df.groupby(by=["process", "value_hash"]).filter(lambda x: len(x) == 1)
df["rank"] = df.groupby(by=["filename", "process"]).cumcount() + 1
df.sort_values(["process", "rank", "filename"]).to_csv("/dev/stdout", sep ="\t")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment