Created
August 12, 2024 20:36
-
-
Save jonathanabila/5e8d412b22f6543730df07013b480eda to your computer and use it in GitHub Desktop.
Dynamic Output with regular fain in
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Generator | |
from dagster import op, job, DynamicOut, DynamicOutput, Out, Dict, graph, Output | |
@op(out=DynamicOut(Dict)) | |
def generate_non_failing(): | |
""" | |
generate_non_failings a set of dynamic outputs to map to our subgraph_non_failing | |
""" | |
for i in range(10): | |
yield DynamicOutput({"name": f'Item: {i}', "value": i+1}, mapping_key=str(i)) | |
@op(out={"name": Out(str), "value": Out(int)}) | |
def unpack_non_failing(context, packed, dependency): | |
""" | |
unpack_non_failings the DynamicOutput into the tupled output | |
Can this be simplified? Or is there a better way to do this via the core API? | |
""" | |
context.log.info(f'unpack_non_failinging: {packed}') | |
return packed["name"], packed["value"] | |
@op(out={"packed": Out(Dict)}) | |
def pack_non_failing(name, value): | |
""" | |
Pack the tuple back into a single dictionary | |
""" | |
return {"name": name, "value": value} | |
@op( | |
out={ | |
"run": Out(is_required=False), | |
"skip": Out(is_required=False), | |
} | |
) | |
def should_run_unpack( | |
should_run, | |
dependency, | |
) -> Generator: | |
if should_run: | |
yield Output(1, "run") | |
else: | |
yield Output(2, "skip") | |
@graph | |
def subgraph_non_failing(vals, should_run): | |
""" | |
subgraph_non_failing of operations | |
""" | |
run, skip = should_run_unpack(should_run, vals) | |
n, v = unpack_non_failing(vals, [run, skip]) | |
return pack_non_failing(n, v) | |
@job | |
def non_failing_dagster_job(should_run): | |
""" | |
Our job that actually does some work | |
desc = 'Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.' | |
def failed(desc: str) -> NoReturn: | |
if not isinstance(desc, str): | |
raise CheckError("desc argument must be a string") | |
> raise CheckError(f"Failure condition: {desc}") | |
E dagster._check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time. | |
""" | |
generate_non_failing().map(lambda vals: subgraph_non_failing(vals, should_run)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment