Last active
April 22, 2020 15:58
-
-
Save danielecook/7fae638ca7745efae4358d0584483255 to your computer and use it in GitHub Desktop.
Parse nextflow output with -dump-hashes enabled for comparison #nextflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import re | |
import io | |
import pandas as pd | |
import numpy as np | |
HELP_MSG = """ | |
diff hashes | |
Author: Daniel Cook | |
This program cleans up output from a nextflow run with the -dump-hashes option enabled. | |
It is designed to help debug where caching issues occur with nextflow. | |
Usage: | |
# Run nextflow with -dump-hashes | |
nextflow run main.nf -resume -dump-hashes > log1.txt | |
nextflow run main.nf -resume -dump-hashes > log2.txt | |
python diff_hashes.py log1.txt log2.txt | |
""" | |
def generate_csv(fnames): | |
yield output_line("filename", "process", "process_hash", "value_hash", "value_type", "value") | |
for fname in fnames: | |
last_cache_line = 0 | |
output_last = False | |
value_open = False | |
value = None | |
value_hash = None | |
value_type = None | |
process_name = None | |
with open(fname, 'r') as f: | |
for n, line in enumerate(f): | |
if "cache hash:" in line: | |
cloc = line.find("cache hash:") | |
process_name = line[1:cloc-2] | |
process_cache_hash = line[cloc + 12:cloc + 12 + 32] | |
output_last = True | |
cache_line = re.match(r"^\W+([a-f0-9]{32}) \[([^\\]+?)\] (.*)", line) | |
# Check whether new cache line is +1 | |
line_diff = n - last_cache_line | |
if (cache_line and line_diff == 1) or output_last and value: | |
yield output_line(fname, process_name, process_cache_hash, value_hash, value_type, repr(value.strip())) | |
value_open = False | |
output_last = False | |
# Then process the next cache line | |
if cache_line: | |
value_hash = cache_line.group(1) | |
value_type = cache_line.group(2) | |
value = cache_line.group(3).strip() | |
value_open = True | |
last_cache_line = n | |
elif line.strip("\n\r") == " ": | |
# If a line is a single space appears to be the only way in which | |
# the end of a value is signalled...for now. | |
value_open = False | |
output_line(fname, process_name, process_cache_hash, value_hash, value_type, repr(value.strip())) | |
elif value_open: | |
value += repr(line) | |
yield output_line(fname, process_name, process_cache_hash, value_hash, value_type, repr(value.strip())) | |
def output_line(*k): | |
return "\t".join(k) | |
if __name__ == "__main__": | |
if len(sys.argv) == 1: | |
print(HELP_MSG) | |
raw = io.StringIO('\n'.join(generate_csv(sys.argv[1:3]))) | |
df = pd.read_csv(raw, sep="\t") | |
# Now add a group index to every process | |
# First drop absolute duplicates; These are not helpful. | |
#df = df[df.duplicated(subset=['process', 'process_hash', 'value_hash', 'value_type']) == False] | |
# Filter for processes present in both hash files | |
df = df.groupby(by=["process"]).filter(lambda x: len(x["filename"].unique()) == 2) | |
# Filter value hashes present in both processes | |
df = df.groupby(by=["process", "value_hash"]).filter(lambda x: len(x) == 1) | |
df["rank"] = df.groupby(by=["filename", "process"]).cumcount() + 1 | |
df.sort_values(["process", "rank", "filename"]).to_csv("/dev/stdout", sep ="\t") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment