Skip to content

Instantly share code, notes, and snippets.

@skrawcz
Last active October 18, 2024 19:07
Show Gist options
  • Save skrawcz/01c46f874c46efc5159c671608781b0d to your computer and use it in GitHub Desktop.
Save skrawcz/01c46f874c46efc5159c671608781b0d to your computer and use it in GitHub Desktop.
Burr for workflow management of science work
import copy
from IPython.display import Image, display
from IPython.core.display import HTML
from burr.core import ApplicationBuilder, State, default, graph, when
from burr.core.action import action
from burr.tracking import LocalTrackingClient
@action(reads=[], writes=["output_folder"])
def task1(state: State, starting_folder: str) -> State:
# set folder
# run hamilton dag 1
# write what folder was used
return state
@action(reads=["output_folder"], writes=["output_folder"])
def task2(state: State) -> State:
# use prior output folder
# run hamilton dag 2
# write results to output folder
return state
@action(reads=["output_folder"], writes=["output_folder"])
def parallel_task(state: State) -> State:
# for each combination
# create hamilton DAG and run - map; run task 3 & 4
# aggregegate results
return state
@action(reads=["output_folder", "workplan"], writes=["output_folder", "should_loop"])
def loop_task(state: State, combinations: list) -> State:
# if not workplan - for each combination write out workplan -- loop through it
# if workplan, take task off and run
# create hamilton DAG and run - map; run task 3 & 4
# aggregegate results
# is there more work? y/n -> write to should_loop
return state
@action(reads=["output_folder"], writes=["output_folder"])
def task5(state: State) -> State:
# run hamilton dag 5
return state
def build_graph():
roys_graph = (
graph.GraphBuilder()
.with_actions(
# these are the "nodes"
task1=task1, task2=task2, loop_task=loop_task, task5=task5
)
.with_transitions(
# these are the edges between nodes, based on state.
("task1", "task2", default),
# ("task2", "parallel_task", default),
("task2", "loop_task", default),
# ("parallel_task", "task5", default),
("loop_task", "loop_task", when(should_loop=True)),
("loop_task", "task5", default),
)
.build()
)
return roys_graph
def build_application(roys_graph):
roy_tracker = LocalTrackingClient(project="roy_demo")
roys_app = (
ApplicationBuilder()
.with_graph(roys_graph) # this could be different...
.initialize_from(
roy_tracker,
resume_at_next_action=True,
default_state={"set_defaults_here": []}, # can set default state for the entire run...
default_entrypoint="task1",
)
.with_tracker(roy_tracker, use_otel_tracing=True)
# .with_state_persister(persister)
.build()
)
return roys_app
if __name__ == "__main__":
graph = build_graph()
app = build_application(graph)
last_action, action_result, app_state = app.run(
halt_after=["task5"],
inputs={...}
)
print(...)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment