Skip to content

Instantly share code, notes, and snippets.

@jonathanabila
Created August 12, 2024 20:36
Show Gist options
  • Save jonathanabila/5e8d412b22f6543730df07013b480eda to your computer and use it in GitHub Desktop.
Save jonathanabila/5e8d412b22f6543730df07013b480eda to your computer and use it in GitHub Desktop.
Dynamic Output with regular fain in
from typing import Generator
from dagster import op, job, DynamicOut, DynamicOutput, Out, Dict, graph, Output
@op(out=DynamicOut(Dict))
def generate_non_failing():
"""
generate_non_failings a set of dynamic outputs to map to our subgraph_non_failing
"""
for i in range(10):
yield DynamicOutput({"name": f'Item: {i}', "value": i+1}, mapping_key=str(i))
@op(out={"name": Out(str), "value": Out(int)})
def unpack_non_failing(context, packed, dependency):
"""
unpack_non_failings the DynamicOutput into the tupled output
Can this be simplified? Or is there a better way to do this via the core API?
"""
context.log.info(f'unpack_non_failinging: {packed}')
return packed["name"], packed["value"]
@op(out={"packed": Out(Dict)})
def pack_non_failing(name, value):
"""
Pack the tuple back into a single dictionary
"""
return {"name": name, "value": value}
@op(
out={
"run": Out(is_required=False),
"skip": Out(is_required=False),
}
)
def should_run_unpack(
should_run,
dependency,
) -> Generator:
if should_run:
yield Output(1, "run")
else:
yield Output(2, "skip")
@graph
def subgraph_non_failing(vals, should_run):
"""
subgraph_non_failing of operations
"""
run, skip = should_run_unpack(should_run, vals)
n, v = unpack_non_failing(vals, [run, skip])
return pack_non_failing(n, v)
@job
def non_failing_dagster_job(should_run):
"""
Our job that actually does some work
desc = 'Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.'
def failed(desc: str) -> NoReturn:
if not isinstance(desc, str):
raise CheckError("desc argument must be a string")
> raise CheckError(f"Failure condition: {desc}")
E dagster._check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.
"""
generate_non_failing().map(lambda vals: subgraph_non_failing(vals, should_run))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment