As of 1.1.7, Dagster supports assets that rely on prior versions of themselves, for example, an asset that implements a differential equation.
To run this example, first install the dependencies:
pip install dagster, dagit
Then open a shell:
mkdir dagster-home
export DAGSTER_HOME=dagster-home
dagit -f diffeq.py
Open a second shell:
export DAGSTER_HOME=dagster-home
dagster-daemon run -f diffeq.py
Once running:
-
Start by selecting the asset and materializing all of the missing partitions. As of 1.1.7 you'll need to run each partition in sequence manually. In the future, dagster will support sequential backfills automatically.
-
After the backfill is complete, turn on the schedule to see the asset computed once a minute.
OR
- Turn on the asset reconciliation sensor. A first, for each tick you will see the sensor fill in the missing asset partitions. Once caught up, the sensor will begin to run the new partitions.
The self-dependency is represented using a TimeWindowPartitionMapping
:
@asset(
partitions_def=minute_partition,
ins={
"velocity_by_minute": AssetIn(
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
)
},
)
def velocity_by_minute(velocity_by_minute):
The asset itself should handle the case where the first partition is being run, in which case the prior partition will be None
:
if velocity_by_minute is None:
return Output(
(0, random_position()),
metadata={"velocity": 0},
)
Because the first partition will have be passed None, assets using self-dependencies should not use typing.
What the sensor looks like as it backfills the missing partitions in sequence and then begins running new partitions:
The results: