Skip to content

Instantly share code, notes, and snippets.

@gryBox
Created July 17, 2020 19:24
Show Gist options
  • Save gryBox/da862d89ad0df49c02cc9b86e334b23c to your computer and use it in GitHub Desktop.
Save gryBox/da862d89ad0df49c02cc9b86e334b23c to your computer and use it in GitHub Desktop.
merge flows
def build_a_flow(
flow: Flow,
flows_to_add: list=[],
params_to_remove: dict={},
visualize: bool=False
):
"""
A function to build a flow using update and merge
"""
# Flows to add
if flows_to_add:
for fl in flows_to_add:
flow.update(fl, merge_parameters=True)
# Get rid of middle params
if params_to_remove:
remove_middle_params(
flow1=flow,
params_to_remove=params_to_remove
)
if visualize == True:
flow.visualize()
def remove_middle_params(flow1: "Flow", params_to_remove: dict) -> Flow:
"""
Removes a parameter from a flow and reassigns its downstream task to a new upstream task.
This is useful after merging two flows where
a parameter was only acting as conduit between two disparate flows.
Args:
- G (Flow): The flow to manipulate.
- params_to_remove ({"<param_name>: **kwargs}): A dictionary of `{key: {value}}`
pairs. Where the `key` is the parameter to remove and
the value` is a dictionary of argument to pass `flow.get_tasks()`.
Returns:
- U (Flow): A new flow with re-defined edges and extraneous parameters` removed.
Raises:
- ValueError: If the parameter was not found or upstream task was not found in flow tasks.
- ValueError: If the upstream task is not uniquely identified.
"""
# flow1 = copy.deepcopy(flow)
for param_name, upstream_id_attr in params_to_remove.items():
# Find the parameter to remove in the flow
middle_param = flow1.get_tasks(name=param_name, task_type=Parameter)
# Find the upstream task to task to remove in the flow
upstream_task = flow1.get_tasks(**upstream_id_attr)
if not middle_param:
raise ValueError(f"Parameter {param_name} was not found in Flow {flow1}")
elif not upstream_task:
raise ValueError(
f"Upstream task: {upstream_task} was not found in Flow {flow1}"
)
# elif len(upstream_task) > 1:
# raise ValueError(
# f"{upstream_task[0].name} was not uniquely identified in Flow {flow1}"
# )
param_to_replace = middle_param[0]
upstream = upstream_task[0]
# Get edges to replace from the paramater i.e. affected edges
edges_to_replace = flow1.edges_from(param_to_replace)
# Loop through edges and set new edges
for edge in edges_to_replace:
# Define downstream task
downstream = edge.downstream_task
flow1.add_edge(
upstream_task=upstream,
downstream_task=downstream,
key=edge.key,
mapped=edge.mapped,
validate=False,
)
print(f"Removing old edge {edge}")
flow1.edges.remove(edge)
# Remove middle param
flow1.tasks.remove(param_to_replace)
from utilities.generate_keys import parse_input_fl
from utilities.flow_operators import build_a_flow
params_to_remove={
"term": {"name": "get-term-value"},
"node_id_kvs": {"tags": ["node_id_kvs"]
}
}
build_a_flow(
flow=get_primary_sources_fl,
flows_to_add=[parse_input_fl],
params_to_remove=params_to_remove,
visualize=True
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment