Skip to content

Instantly share code, notes, and snippets.

@slopp
Last active December 19, 2022 16:55
Show Gist options
  • Save slopp/c4eee86c55c5d5635ccba196672e5ea2 to your computer and use it in GitHub Desktop.
Save slopp/c4eee86c55c5d5635ccba196672e5ea2 to your computer and use it in GitHub Desktop.
Diff Eqs in Dagster

Self-Dependent Asset Partitions

As of 1.1.7, Dagster supports assets that rely on prior versions of themselves, for example, an asset that implements a differential equation.

Getting Started

To run this example, first install the dependencies:

pip install dagster, dagit

Then open a shell:

mkdir dagster-home
export DAGSTER_HOME=dagster-home
dagit -f diffeq.py

Open a second shell:

export DAGSTER_HOME=dagster-home
dagster-daemon run -f diffeq.py

Once running:

  1. Start by selecting the asset and materializing all of the missing partitions. As of 1.1.7 you'll need to run each partition in sequence manually. In the future, dagster will support sequential backfills automatically.

  2. After the backfill is complete, turn on the schedule to see the asset computed once a minute.

OR

  1. Turn on the asset reconciliation sensor. A first, for each tick you will see the sensor fill in the missing asset partitions. Once caught up, the sensor will begin to run the new partitions.

Notes on Code

The self-dependency is represented using a TimeWindowPartitionMapping:

@asset(
    partitions_def=minute_partition,
    ins={
        "velocity_by_minute": AssetIn(
            partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
        )
    },
)
def velocity_by_minute(velocity_by_minute):

The asset itself should handle the case where the first partition is being run, in which case the prior partition will be None:

if velocity_by_minute is None:
        return Output(
            (0, random_position()),
            metadata={"velocity": 0},
        )

Because the first partition will have be passed None, assets using self-dependencies should not use typing.

import datetime
import random
datetime.datetime.now()
from dagster import (
AssetIn,
Definitions,
Output,
TimeWindowPartitionMapping,
TimeWindowPartitionsDefinition,
asset,
build_schedule_from_partitioned_job,
define_asset_job,
build_asset_reconciliation_sensor,
AssetSelection
)
start_datetime_str = str(datetime.datetime.utcnow() - datetime.timedelta(minutes=5))
minute_partition = TimeWindowPartitionsDefinition(
start=start_datetime_str, cron_schedule="*/1 * * * *", fmt="%Y-%m-%d %H:%M:%S.%f"
)
@asset(
partitions_def=minute_partition,
ins={
"velocity_by_minute": AssetIn(
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
)
},
)
def velocity_by_minute(velocity_by_minute):
"""Calculates velocity using current and last position"""
if velocity_by_minute is None:
print("OOPS got a none input")
return Output(
(0, random_position()),
metadata={"velocity": 0},
)
_, last_position = velocity_by_minute
current_position = random_position()
current_velocity = current_position - last_position
return Output(
(current_velocity, current_position),
metadata={
"velocity": current_velocity,
"last_position": last_position,
"current_position": current_position,
},
)
def random_position():
return random.choice(list(range(1, 20)))
reconciler = build_asset_reconciliation_sensor(
name = "reconciler",
asset_selection = AssetSelection.all()
)
defs = Definitions(
assets=[velocity_by_minute],
schedules=[
build_schedule_from_partitioned_job(
define_asset_job(
name="velocity_by_minute_job",
selection="velocity_by_minute",
partitions_def=minute_partition,
),
name="velocity_by_minute_schedule",
)
],
sensors=[reconciler]
)
@slopp
Copy link
Author

slopp commented Dec 19, 2022

What the sensor looks like as it backfills the missing partitions in sequence and then begins running new partitions:

sensor

The results:

results

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment