Last active
October 18, 2024 19:07
-
-
Save skrawcz/01c46f874c46efc5159c671608781b0d to your computer and use it in GitHub Desktop.
Burr for workflow management of science work
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import copy | |
from IPython.display import Image, display | |
from IPython.core.display import HTML | |
from burr.core import ApplicationBuilder, State, default, graph, when | |
from burr.core.action import action | |
from burr.tracking import LocalTrackingClient | |
@action(reads=[], writes=["output_folder"]) | |
def task1(state: State, starting_folder: str) -> State: | |
# set folder | |
# run hamilton dag 1 | |
# write what folder was used | |
return state | |
@action(reads=["output_folder"], writes=["output_folder"]) | |
def task2(state: State) -> State: | |
# use prior output folder | |
# run hamilton dag 2 | |
# write results to output folder | |
return state | |
@action(reads=["output_folder"], writes=["output_folder"]) | |
def parallel_task(state: State) -> State: | |
# for each combination | |
# create hamilton DAG and run - map; run task 3 & 4 | |
# aggregegate results | |
return state | |
@action(reads=["output_folder", "workplan"], writes=["output_folder", "should_loop"]) | |
def loop_task(state: State, combinations: list) -> State: | |
# if not workplan - for each combination write out workplan -- loop through it | |
# if workplan, take task off and run | |
# create hamilton DAG and run - map; run task 3 & 4 | |
# aggregegate results | |
# is there more work? y/n -> write to should_loop | |
return state | |
@action(reads=["output_folder"], writes=["output_folder"]) | |
def task5(state: State) -> State: | |
# run hamilton dag 5 | |
return state | |
def build_graph(): | |
roys_graph = ( | |
graph.GraphBuilder() | |
.with_actions( | |
# these are the "nodes" | |
task1=task1, task2=task2, loop_task=loop_task, task5=task5 | |
) | |
.with_transitions( | |
# these are the edges between nodes, based on state. | |
("task1", "task2", default), | |
# ("task2", "parallel_task", default), | |
("task2", "loop_task", default), | |
# ("parallel_task", "task5", default), | |
("loop_task", "loop_task", when(should_loop=True)), | |
("loop_task", "task5", default), | |
) | |
.build() | |
) | |
return roys_graph | |
def build_application(roys_graph): | |
roy_tracker = LocalTrackingClient(project="roy_demo") | |
roys_app = ( | |
ApplicationBuilder() | |
.with_graph(roys_graph) # this could be different... | |
.initialize_from( | |
roy_tracker, | |
resume_at_next_action=True, | |
default_state={"set_defaults_here": []}, # can set default state for the entire run... | |
default_entrypoint="task1", | |
) | |
.with_tracker(roy_tracker, use_otel_tracing=True) | |
# .with_state_persister(persister) | |
.build() | |
) | |
return roys_app | |
if __name__ == "__main__": | |
graph = build_graph() | |
app = build_application(graph) | |
last_action, action_result, app_state = app.run( | |
halt_after=["task5"], | |
inputs={...} | |
) | |
print(...) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment