-
-
Save cr3a7ure/785f4bad48fe5b29c93fc8b7397ea722 to your computer and use it in GitHub Desktop.
Dump DAG definitions to file/stdout
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Script to print DAG+Task information in a text format. This can be used to quickly compare against other branches | |
or versions to confirm the "compiled" version of a DAG is matching expectations. | |
Usage: | |
1. ~/airflow-install (branch) $ ./worker/run --no-name python3 /opt/airflow/utils/dag_dumper.py -l dag1 dag2 -o /opt/airflow/dags/devenv_config/dumps/dag_dump_branch | |
2. ~/airflow-install (branch) $ git checkout development | |
3. ~/airflow-install (development) $ ./worker/run --no-name python3 /opt/airflow/utils/dag_dumper.py -l dag1 dag2 -o /opt/airflow/dags/devenv_config/dumps/dag_dump_dev | |
4. Run comparison against the 2 output files | |
""" | |
import argparse | |
import inspect | |
import re | |
import sys | |
import types | |
from airflow.models import DagBag | |
from airflow.models import DAG | |
from airflow.models import BaseOperator | |
INDENT_SIZE = 2 | |
class AttrDumper: | |
def __init__(self, | |
objects, | |
drill_classes=None, | |
max_levels=5, | |
ignore_attrs=None, | |
ignore_classes=None, | |
no_drill_attrs=None, | |
dump_fn_code=False, | |
dump_methods_for_classes=None, | |
collapse_multiline_str=False, | |
output_file=None): | |
""" | |
@param objects: list of objects to dump and drill into | |
@param drill_classes: only objects that are instances of these classes will be drilled into | |
@param max_levels: maximum number of attribute levels to drill into | |
@param ignore_attrs: if this attribute name is encountered, skip it completely | |
@param ignore_classes: if an object with this class is encountered, skip it completely | |
@param no_drill_attrs: if this attribute name is encountered, dump but do not drill | |
@param dump_fn_code: boolean whether or not to dump the source code of function attributes | |
@param dump_methods_for_classes: list of classes that the source code of class methods should be dumped | |
@param collapse_multiline_str: boolean whether or not to remove newlines when dumping string values | |
@param output_file: file to send output to. If None, send to stdout | |
""" | |
self.objects = objects | |
self.drill_classes = drill_classes | |
self.max_levels = max_levels | |
self.ignore_attrs = ignore_attrs or [] | |
self.ignore_classes = ignore_classes or [] | |
self.no_drill_attrs = no_drill_attrs or [] | |
self.dump_fn_code = dump_fn_code | |
self.dump_methods_for_classes = dump_methods_for_classes or [] | |
self.collapse_multiline_str = collapse_multiline_str | |
self.output_file = output_file | |
self.obj_seen = set() | |
self.file_handle = None | |
def run(self): | |
""" | |
Execute the dump/drill process for all objects. | |
""" | |
if self.output_file: | |
self.file_handle = open(self.output_file, 'w') | |
else: | |
self.file_handle = sys.stdout | |
for obj in sorted(self.objects, key=lambda o: repr(o)): | |
self._dump(obj) | |
if self.file_handle is not sys.stdout: | |
self.file_handle.close() | |
self.obj_seen = set() | |
def _dump(self, obj=None, level=0, key=''): | |
""" | |
Print (to file or stdout) the current object and conditionally/recursively drill into that object's attributes. | |
@param obj: object to dump and then drill into | |
@param level: depth level of attribute drilling | |
@param key: the key or name of the incoming object | |
""" | |
self._print(level, key, obj) | |
# do not further explore attrs if: | |
# obj is primitive type | |
# this object has already been dumped | |
# recurse level > max | |
# this object type is not white-listed to explore | |
# this attribute is specified to not drill into | |
if \ | |
isinstance(obj, (int, float, str, list, dict, set)) \ | |
or id(obj) in self.obj_seen \ | |
or level > self.max_levels \ | |
or (self.drill_classes and not any([isinstance(obj, c) for c in self.drill_classes])) \ | |
or key in self.no_drill_attrs: | |
return | |
else: | |
self.obj_seen.add(id(obj)) | |
for attr in dir(obj): | |
val = getattr(obj, attr) | |
# skip this attr if: | |
# we specified to ignore this attr name | |
# we specified to ignore this attr class | |
# attr is private | |
if \ | |
attr in self.ignore_attrs \ | |
or (self.ignore_classes and any([isinstance(val, c) for c in self.ignore_classes])) \ | |
or re.match(r'_.*', attr): | |
continue | |
if isinstance(val, (str, int, float)): | |
self._dump(val, level + 1, attr) | |
elif isinstance(val, (list, set)): | |
self._dump(str(type(val)), level + 1, attr) | |
val = sorted(val, key=lambda e: repr(e)) | |
if attr not in self.no_drill_attrs: | |
for i, v in enumerate(val): | |
self._dump(v, level + 2, '{}[{}]'.format(attr, i)) | |
elif isinstance(val, dict): | |
self._dump(str(type(val)), level + 1, attr) | |
val = sorted(val.items(), key=lambda kv: repr(kv[0])) | |
if attr not in self.no_drill_attrs: | |
for k, v in val: | |
self._dump(v, level + 2, '{}[{}]'.format(attr, k)) | |
elif isinstance(val, types.FunctionType): | |
if self.dump_fn_code: | |
self._dump(inspect.getsource(val), level + 1, attr) | |
else: | |
self._dump(val, level + 1, attr) | |
elif isinstance(val, types.MethodType): | |
if any([isinstance(val.__self__, c) for c in self.dump_methods_for_classes]): | |
self._dump(inspect.getsource(val), level + 1, attr) | |
else: | |
self._dump(val, level + 1, attr) | |
else: # class object | |
self._dump(val, level + 1, attr) | |
def _print(self, level, key, value): | |
""" | |
Format and print a key/value. | |
@param level: drill depth | |
@param key: key to print | |
@param value: value to print | |
""" | |
# keep on one line, sorted | |
if isinstance(value, str): | |
if self.collapse_multiline_str: | |
value_print = value.replace('\n', '\\n') | |
else: | |
value_print = value.replace('\n', '\n' + (level+1)*INDENT_SIZE*' ') | |
elif isinstance(value, (list, set)): | |
value_print = repr(sorted(value, key=lambda e: repr(e))) | |
elif isinstance(value, dict): | |
value_print = repr(sorted(value.items(), key=lambda kv: repr(kv[0]))) | |
else: | |
value_print = repr(value) | |
# remove id from class reprs to avoid comparison false-positives | |
value_print = re.sub(r'<(.*) at 0x\w+>', r'<\1>', value_print) | |
# dump it | |
self.file_handle.write('{}{}{}\n'.format( | |
level*INDENT_SIZE*' ', | |
key + ' = ' if key else '', | |
value_print)) | |
def run(dags_dir, dag_list, output_dir): | |
""" | |
Run through all specified dags and dump them. The parameters are tweaked to limit the results to avoid noise in | |
the resulting output. | |
@param dags_dir: where to find the dags | |
@param dag_list: specific list of dags to dump. If None, dump all | |
@param output_dir: where to dump. If None, send to stdout | |
@return: | |
""" | |
dag_bag = DagBag(dags_dir) | |
dags = {dag_name: dag for dag_name, dag in dag_bag.dags.items() if dag_list is None or dag_name in dag_list} | |
for dag_name, dag in dags.items(): | |
output_file = '/'.join([output_dir, dag_name]) if output_dir else None | |
dumper = AttrDumper([dag], | |
# only interested in attributes from DAGs and Operators | |
drill_classes=[DAG, BaseOperator], | |
ignore_attrs=[ | |
# constantly-changing timestamp. Remove for comparison's sake | |
'last_loaded', | |
# inconsistent ordering in Resources repr. Remove for comparison's sake | |
'resources', | |
# static methods in DAG class. don't need to see em | |
'deactivate_stale_dags', 'deactivate_unknown_dags', 'get_num_task_instances' | |
], | |
# class methods don't have useful values to check | |
ignore_classes=[types.MethodType], | |
# these DAG attrs are just subsets of `tasks`, so just show them without drilling in | |
no_drill_attrs=['active_tasks', 'downstream_list', 'roots', 'task_dict', 'upstream_list'], | |
dump_fn_code=True, | |
output_file=output_file) | |
dumper.run() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter) | |
parser.add_argument('-d', '--dags-dir', help='Dags directory (in container)', default='/opt/airflow/dags') | |
parser.add_argument('-l', '--dag-list', help='List of dags (by name) to dump', nargs='+', default=None) | |
parser.add_argument('-o', '--output-dir', help='Directory to send results to. Defaults to stdout if not provided', default=None) | |
args = parser.parse_args() | |
run(args.dags_dir, args.dag_list, args.output_dir) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment