Skip to content

Instantly share code, notes, and snippets.

@slopp
slopp / example.py
Created September 24, 2024 21:42
IO Manager that Depends on Resource
from typing import Any
from dagster import ConfigurableResource, ConfigurableIOManager, InputContext, OutputContext, asset, Definitions, ResourceDependency, EnvVar
from pydantic import Field
# https://docs.dagster.io/concepts/resources#resources-that-depend-on-other-resources
class myResource(ConfigurableResource):
username: str = Field(description="the username")
password: str = Field(description="the password")
@slopp
slopp / job.py
Created September 23, 2024 19:31
Canary Ping Dagster
import os
from dagster import define_asset_job, load_assets_from_package_module, repository, with_resources, op, job, ScheduleDefinition
from my_dagster_project import assets
from datadog_api_client import ApiClient, Configuration
from datadog_api_client.v2.api.metrics_api import MetricsApi
from datadog_api_client.v2.model.metric_intake_type import MetricIntakeType
from datadog_api_client.v2.model.metric_payload import MetricPayload
from datadog_api_client.v2.model.metric_point import MetricPoint
from datetime import datetime
@slopp
slopp / palmer.py
Last active August 28, 2024 13:03
Palmer ML Workflow with Dagster
import datetime
import pins
import os
import seaborn as sns
from dagster import asset, asset_check, AssetCheckResult
from posit import connect # install as uv pip install posit-sdk
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
@slopp
slopp / assets.py
Created June 27, 2024 18:04
Custom dagster asset decorator
from dagster import asset
# add an attribute to all assets using this decorator without users having to adjust it
def bi_team_asset(**asset_decorator_kwargs):
def _wrapper(f):
@asset(**asset_decorator_kwargs, owners=["[email protected]"], name=f.__name__)
def _impl(**kwargs):
return f(**kwargs)
return _impl
@slopp
slopp / config.json
Created June 27, 2024 17:27
Dagster asset factory example
[
{
"dag_id": "powerbi1",
"python_script": "some_script.py",
"dataset_id": "fdsjl4539fdjsk"
},
{
"dag_id": "powerbi2",
"python_script": "some_other_script.py",
"dataset_id": "89fdskfds0"
@slopp
slopp / ReadMe.md
Last active May 13, 2024 20:59
Example of different automation policies

This example shows how to create an asset graph where the root assets are run by various schedules, and the downstreams are run using varying auto-materialization policies.

Screen Shot 2024-05-13 at 2 57 37 PM

To install:

pip install dagster dagster-webserver
@slopp
slopp / README.md
Created February 27, 2024 16:14
SQL Server to GCS to BQ Dagster Pipeline Example

This example shows a skeleton for how to build a Dagster project that extracts tables from SQL Server, stores the extract as a CSV in GCS, and then uploads the GCS extract to BigQuery.

The actual extract and load logic is omitted. But the purpose of this project is to show how such a pipeline can be represented in Dagster assets.

First, a single pipeline for one table is created. This is demonstrated in the file dagster_mock_one_table.py. To run this example:

  1. Create a Python virtual environment and then run:
pip install dagster dagster-webserver
@slopp
slopp / fn_profile.yaml
Created January 23, 2024 16:01
Dagster with a custom DSL
profile:
name: FN
stocks_to_index:
- ticker: NFLX
- ticker: META
index_strategy:
type: equal
forecast:
days: 60
@slopp
slopp / credits.sql
Created September 28, 2023 21:01
Credits Dagster OSS
with events as (select distinct
DATE_FORMAT(timestamp, '%Y-%m') as event_month,
dagster_event_type,
coalesce(run_id, '||', step_key) as step_id,
count(1) as credits
from event_logs
where dagster_event_type = 'STEP_START'
@slopp
slopp / observable_source_assets1.py
Last active August 10, 2023 17:17
Example of Observable Source Assets
# run with dagster-dev -f observable_source_assets1.py, then enable the auto materialization daemon
# the downstreams will update as the live data is observed
from dagster import observable_source_asset, asset, AutoMaterializePolicy, DataVersion
from datetime import datetime
def today():
return datetime.today().date()
def now():