Skip to content

Instantly share code, notes, and snippets.

@cr3a7ure
Forked from alevene/dag_dumper.py
Created December 4, 2020 19:04
Show Gist options
  • Save cr3a7ure/785f4bad48fe5b29c93fc8b7397ea722 to your computer and use it in GitHub Desktop.
Save cr3a7ure/785f4bad48fe5b29c93fc8b7397ea722 to your computer and use it in GitHub Desktop.
Dump DAG definitions to file/stdout
"""
Script to print DAG+Task information in a text format. This can be used to quickly compare against other branches
or versions to confirm the "compiled" version of a DAG is matching expectations.
Usage:
1. ~/airflow-install (branch) $ ./worker/run --no-name python3 /opt/airflow/utils/dag_dumper.py -l dag1 dag2 -o /opt/airflow/dags/devenv_config/dumps/dag_dump_branch
2. ~/airflow-install (branch) $ git checkout development
3. ~/airflow-install (development) $ ./worker/run --no-name python3 /opt/airflow/utils/dag_dumper.py -l dag1 dag2 -o /opt/airflow/dags/devenv_config/dumps/dag_dump_dev
4. Run comparison against the 2 output files
"""
import argparse
import inspect
import re
import sys
import types
from airflow.models import DagBag
from airflow.models import DAG
from airflow.models import BaseOperator
INDENT_SIZE = 2
class AttrDumper:
def __init__(self,
objects,
drill_classes=None,
max_levels=5,
ignore_attrs=None,
ignore_classes=None,
no_drill_attrs=None,
dump_fn_code=False,
dump_methods_for_classes=None,
collapse_multiline_str=False,
output_file=None):
"""
@param objects: list of objects to dump and drill into
@param drill_classes: only objects that are instances of these classes will be drilled into
@param max_levels: maximum number of attribute levels to drill into
@param ignore_attrs: if this attribute name is encountered, skip it completely
@param ignore_classes: if an object with this class is encountered, skip it completely
@param no_drill_attrs: if this attribute name is encountered, dump but do not drill
@param dump_fn_code: boolean whether or not to dump the source code of function attributes
@param dump_methods_for_classes: list of classes that the source code of class methods should be dumped
@param collapse_multiline_str: boolean whether or not to remove newlines when dumping string values
@param output_file: file to send output to. If None, send to stdout
"""
self.objects = objects
self.drill_classes = drill_classes
self.max_levels = max_levels
self.ignore_attrs = ignore_attrs or []
self.ignore_classes = ignore_classes or []
self.no_drill_attrs = no_drill_attrs or []
self.dump_fn_code = dump_fn_code
self.dump_methods_for_classes = dump_methods_for_classes or []
self.collapse_multiline_str = collapse_multiline_str
self.output_file = output_file
self.obj_seen = set()
self.file_handle = None
def run(self):
"""
Execute the dump/drill process for all objects.
"""
if self.output_file:
self.file_handle = open(self.output_file, 'w')
else:
self.file_handle = sys.stdout
for obj in sorted(self.objects, key=lambda o: repr(o)):
self._dump(obj)
if self.file_handle is not sys.stdout:
self.file_handle.close()
self.obj_seen = set()
def _dump(self, obj=None, level=0, key=''):
"""
Print (to file or stdout) the current object and conditionally/recursively drill into that object's attributes.
@param obj: object to dump and then drill into
@param level: depth level of attribute drilling
@param key: the key or name of the incoming object
"""
self._print(level, key, obj)
# do not further explore attrs if:
# obj is primitive type
# this object has already been dumped
# recurse level > max
# this object type is not white-listed to explore
# this attribute is specified to not drill into
if \
isinstance(obj, (int, float, str, list, dict, set)) \
or id(obj) in self.obj_seen \
or level > self.max_levels \
or (self.drill_classes and not any([isinstance(obj, c) for c in self.drill_classes])) \
or key in self.no_drill_attrs:
return
else:
self.obj_seen.add(id(obj))
for attr in dir(obj):
val = getattr(obj, attr)
# skip this attr if:
# we specified to ignore this attr name
# we specified to ignore this attr class
# attr is private
if \
attr in self.ignore_attrs \
or (self.ignore_classes and any([isinstance(val, c) for c in self.ignore_classes])) \
or re.match(r'_.*', attr):
continue
if isinstance(val, (str, int, float)):
self._dump(val, level + 1, attr)
elif isinstance(val, (list, set)):
self._dump(str(type(val)), level + 1, attr)
val = sorted(val, key=lambda e: repr(e))
if attr not in self.no_drill_attrs:
for i, v in enumerate(val):
self._dump(v, level + 2, '{}[{}]'.format(attr, i))
elif isinstance(val, dict):
self._dump(str(type(val)), level + 1, attr)
val = sorted(val.items(), key=lambda kv: repr(kv[0]))
if attr not in self.no_drill_attrs:
for k, v in val:
self._dump(v, level + 2, '{}[{}]'.format(attr, k))
elif isinstance(val, types.FunctionType):
if self.dump_fn_code:
self._dump(inspect.getsource(val), level + 1, attr)
else:
self._dump(val, level + 1, attr)
elif isinstance(val, types.MethodType):
if any([isinstance(val.__self__, c) for c in self.dump_methods_for_classes]):
self._dump(inspect.getsource(val), level + 1, attr)
else:
self._dump(val, level + 1, attr)
else: # class object
self._dump(val, level + 1, attr)
def _print(self, level, key, value):
"""
Format and print a key/value.
@param level: drill depth
@param key: key to print
@param value: value to print
"""
# keep on one line, sorted
if isinstance(value, str):
if self.collapse_multiline_str:
value_print = value.replace('\n', '\\n')
else:
value_print = value.replace('\n', '\n' + (level+1)*INDENT_SIZE*' ')
elif isinstance(value, (list, set)):
value_print = repr(sorted(value, key=lambda e: repr(e)))
elif isinstance(value, dict):
value_print = repr(sorted(value.items(), key=lambda kv: repr(kv[0])))
else:
value_print = repr(value)
# remove id from class reprs to avoid comparison false-positives
value_print = re.sub(r'<(.*) at 0x\w+>', r'<\1>', value_print)
# dump it
self.file_handle.write('{}{}{}\n'.format(
level*INDENT_SIZE*' ',
key + ' = ' if key else '',
value_print))
def run(dags_dir, dag_list, output_dir):
"""
Run through all specified dags and dump them. The parameters are tweaked to limit the results to avoid noise in
the resulting output.
@param dags_dir: where to find the dags
@param dag_list: specific list of dags to dump. If None, dump all
@param output_dir: where to dump. If None, send to stdout
@return:
"""
dag_bag = DagBag(dags_dir)
dags = {dag_name: dag for dag_name, dag in dag_bag.dags.items() if dag_list is None or dag_name in dag_list}
for dag_name, dag in dags.items():
output_file = '/'.join([output_dir, dag_name]) if output_dir else None
dumper = AttrDumper([dag],
# only interested in attributes from DAGs and Operators
drill_classes=[DAG, BaseOperator],
ignore_attrs=[
# constantly-changing timestamp. Remove for comparison's sake
'last_loaded',
# inconsistent ordering in Resources repr. Remove for comparison's sake
'resources',
# static methods in DAG class. don't need to see em
'deactivate_stale_dags', 'deactivate_unknown_dags', 'get_num_task_instances'
],
# class methods don't have useful values to check
ignore_classes=[types.MethodType],
# these DAG attrs are just subsets of `tasks`, so just show them without drilling in
no_drill_attrs=['active_tasks', 'downstream_list', 'roots', 'task_dict', 'upstream_list'],
dump_fn_code=True,
output_file=output_file)
dumper.run()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('-d', '--dags-dir', help='Dags directory (in container)', default='/opt/airflow/dags')
parser.add_argument('-l', '--dag-list', help='List of dags (by name) to dump', nargs='+', default=None)
parser.add_argument('-o', '--output-dir', help='Directory to send results to. Defaults to stdout if not provided', default=None)
args = parser.parse_args()
run(args.dags_dir, args.dag_list, args.output_dir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment