Last active
May 15, 2020 05:13
-
-
Save cicdw/30f9c71642d5158bee3cf0031b3686e1 to your computer and use it in GitHub Desktop.
Custom Prefect Trigger
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def aggregation_skipped_trigger(upstream_states: Dict[Edge, State]) -> bool: | |
""" | |
Custom trigger which trigger fails only if the aggregation task | |
specifically was skipped. | |
""" | |
# upstream_states is a dictionary of _all_ upstream dependencies | |
# but this task will only care about the aggregation task specifically | |
agg_state = [ | |
state | |
for edge, state in upstream_states.items() | |
if edge.upstream_task.name == "aggregation" | |
] | |
if agg_state and agg_state[0].is_skipped(): | |
raise TRIGGERFAIL("Aggregation Task skipped, cannot proceed.") | |
elif not all(s.is_successful() for s in upstream_states.values()): | |
raise TRIGGERFAIL("Some upstream tasks did not succeed, cannot proceed.") | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment