This example shows how to use ops to create a graph of tasks executed as a Dagster job with specific configuration and resources. The example also shows how to use a custom schedule.
To start:
pip install dagster dagit
dagster dev -f ops_example.py| from gql import Client, gql | |
| from gql.transport.requests import RequestsHTTPTransport | |
| import os | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| USER_GRANTS_QUERY = """ | |
| query UsersByRole { | |
| usersOrError { |
| from dagster import DailyPartitionsDefinition | |
| from dagster import asset, OpExecutionContext, Definitions, AssetKey | |
| from itertools import permutations | |
| # Root Nodes | |
| date = DailyPartitionsDefinition(start_date="2023-06-01") | |
| colors = ["blue", "red"] |
This example shows the psuedo-code for a Dagster pipeline that:
To run the pipeline:
| @observable_source_asset( | |
| auto_observe_interval_minutes=24*60 | |
| ) | |
| def daily_data(): | |
| return DataVersion(str(today())) | |
| @observable_source_asset( | |
| auto_observe_interval_minutes=10 | |
| ) | |
| def live_data(): |
| # run with dagster-dev -f observable_source_assets1.py, then enable the auto materialization daemon | |
| # the downstreams will update as the live data is observed | |
| from dagster import observable_source_asset, asset, AutoMaterializePolicy, DataVersion | |
| from datetime import datetime | |
| def today(): | |
| return datetime.today().date() | |
| def now(): |
| with events as (select distinct | |
| DATE_FORMAT(timestamp, '%Y-%m') as event_month, | |
| dagster_event_type, | |
| coalesce(run_id, '||', step_key) as step_id, | |
| count(1) as credits | |
| from event_logs | |
| where dagster_event_type = 'STEP_START' |
| profile: | |
| name: FN | |
| stocks_to_index: | |
| - ticker: NFLX | |
| - ticker: META | |
| index_strategy: | |
| type: equal | |
| forecast: | |
| days: 60 |
This example shows a skeleton for how to build a Dagster project that extracts tables from SQL Server, stores the extract as a CSV in GCS, and then uploads the GCS extract to BigQuery.
The actual extract and load logic is omitted. But the purpose of this project is to show how such a pipeline can be represented in Dagster assets.
First, a single pipeline for one table is created. This is demonstrated in the file dagster_mock_one_table.py. To run this example:
pip install dagster dagster-webserver