Created
July 17, 2020 19:24
-
-
Save gryBox/da862d89ad0df49c02cc9b86e334b23c to your computer and use it in GitHub Desktop.
merge flows
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def build_a_flow( | |
flow: Flow, | |
flows_to_add: list=[], | |
params_to_remove: dict={}, | |
visualize: bool=False | |
): | |
""" | |
A function to build a flow using update and merge | |
""" | |
# Flows to add | |
if flows_to_add: | |
for fl in flows_to_add: | |
flow.update(fl, merge_parameters=True) | |
# Get rid of middle params | |
if params_to_remove: | |
remove_middle_params( | |
flow1=flow, | |
params_to_remove=params_to_remove | |
) | |
if visualize == True: | |
flow.visualize() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def remove_middle_params(flow1: "Flow", params_to_remove: dict) -> Flow: | |
""" | |
Removes a parameter from a flow and reassigns its downstream task to a new upstream task. | |
This is useful after merging two flows where | |
a parameter was only acting as conduit between two disparate flows. | |
Args: | |
- G (Flow): The flow to manipulate. | |
- params_to_remove ({"<param_name>: **kwargs}): A dictionary of `{key: {value}}` | |
pairs. Where the `key` is the parameter to remove and | |
the value` is a dictionary of argument to pass `flow.get_tasks()`. | |
Returns: | |
- U (Flow): A new flow with re-defined edges and extraneous parameters` removed. | |
Raises: | |
- ValueError: If the parameter was not found or upstream task was not found in flow tasks. | |
- ValueError: If the upstream task is not uniquely identified. | |
""" | |
# flow1 = copy.deepcopy(flow) | |
for param_name, upstream_id_attr in params_to_remove.items(): | |
# Find the parameter to remove in the flow | |
middle_param = flow1.get_tasks(name=param_name, task_type=Parameter) | |
# Find the upstream task to task to remove in the flow | |
upstream_task = flow1.get_tasks(**upstream_id_attr) | |
if not middle_param: | |
raise ValueError(f"Parameter {param_name} was not found in Flow {flow1}") | |
elif not upstream_task: | |
raise ValueError( | |
f"Upstream task: {upstream_task} was not found in Flow {flow1}" | |
) | |
# elif len(upstream_task) > 1: | |
# raise ValueError( | |
# f"{upstream_task[0].name} was not uniquely identified in Flow {flow1}" | |
# ) | |
param_to_replace = middle_param[0] | |
upstream = upstream_task[0] | |
# Get edges to replace from the paramater i.e. affected edges | |
edges_to_replace = flow1.edges_from(param_to_replace) | |
# Loop through edges and set new edges | |
for edge in edges_to_replace: | |
# Define downstream task | |
downstream = edge.downstream_task | |
flow1.add_edge( | |
upstream_task=upstream, | |
downstream_task=downstream, | |
key=edge.key, | |
mapped=edge.mapped, | |
validate=False, | |
) | |
print(f"Removing old edge {edge}") | |
flow1.edges.remove(edge) | |
# Remove middle param | |
flow1.tasks.remove(param_to_replace) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from utilities.generate_keys import parse_input_fl | |
from utilities.flow_operators import build_a_flow | |
params_to_remove={ | |
"term": {"name": "get-term-value"}, | |
"node_id_kvs": {"tags": ["node_id_kvs"] | |
} | |
} | |
build_a_flow( | |
flow=get_primary_sources_fl, | |
flows_to_add=[parse_input_fl], | |
params_to_remove=params_to_remove, | |
visualize=True | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment