This example shows how to create an asset graph where the root assets are run by various schedules, and the downstreams are run using varying auto-materialization policies.

To install:
pip install dagster dagster-webserver
[ | |
{ | |
"dag_id": "powerbi1", | |
"python_script": "some_script.py", | |
"dataset_id": "fdsjl4539fdjsk" | |
}, | |
{ | |
"dag_id": "powerbi2", | |
"python_script": "some_other_script.py", | |
"dataset_id": "89fdskfds0" |
This example shows a skeleton for how to build a Dagster project that extracts tables from SQL Server, stores the extract as a CSV in GCS, and then uploads the GCS extract to BigQuery.
The actual extract and load logic is omitted. But the purpose of this project is to show how such a pipeline can be represented in Dagster assets.
First, a single pipeline for one table is created. This is demonstrated in the file dagster_mock_one_table.py
. To run this example:
pip install dagster dagster-webserver
profile: | |
name: FN | |
stocks_to_index: | |
- ticker: NFLX | |
- ticker: META | |
index_strategy: | |
type: equal | |
forecast: | |
days: 60 |
with events as (select distinct | |
DATE_FORMAT(timestamp, '%Y-%m') as event_month, | |
dagster_event_type, | |
coalesce(run_id, '||', step_key) as step_id, | |
count(1) as credits | |
from event_logs | |
where dagster_event_type = 'STEP_START' |
# run with dagster-dev -f observable_source_assets1.py, then enable the auto materialization daemon | |
# the downstreams will update as the live data is observed | |
from dagster import observable_source_asset, asset, AutoMaterializePolicy, DataVersion | |
from datetime import datetime | |
def today(): | |
return datetime.today().date() | |
def now(): |
@observable_source_asset( | |
auto_observe_interval_minutes=24*60 | |
) | |
def daily_data(): | |
return DataVersion(str(today())) | |
@observable_source_asset( | |
auto_observe_interval_minutes=10 | |
) | |
def live_data(): |
This example shows the psuedo-code for a Dagster pipeline that:
To run the pipeline:
from dagster import DailyPartitionsDefinition | |
from dagster import asset, OpExecutionContext, Definitions, AssetKey | |
from itertools import permutations | |
# Root Nodes | |
date = DailyPartitionsDefinition(start_date="2023-06-01") | |
colors = ["blue", "red"] |