Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
cnolanminich / automation_asset.py
Created June 5, 2025 18:35
automation conditions that update once per period once all upstreams are materialized, and then again if any upstreams are materialized within that timeframe
# runs at 5 minutes past the hour for demonstration purposes.
daily_success_condition = (
dg.AutomationCondition.newly_updated()
.since(dg.AutomationCondition.on_cron("*/5 * * * *"))
)
custom_condition = (
dg.AutomationCondition.on_cron("*/5 * * * *") # Runs at 9 AM daily
| (
dg.AutomationCondition.any_deps_updated() # When any dependency updates
@cnolanminich
cnolanminich / historical_concurrency.sql
Last active May 21, 2025 12:20
Get historical concurrency for Dagster OSS instance
with seconds_in_day as (
select generate_series(0, 86400 - 1) as sec
)
, base_days as (
select generate_series(current_date - 7,current_date,'1 day'::interval) as days
),
days_and_seconds as (
select
base_days.days,
base_days.days + (seconds_in_day.sec || ' seconds')::interval as start_bucket,
@cnolanminich
cnolanminich / credits.sql
Created May 14, 2025 16:35
Dagster OSS Credit estimate
with events as (select distinct
DATE_FORMAT(timestamp, '%Y-%m') as event_month,
dagster_event_type,
coalesce(run_id, '||', step_key) as step_id,
count(1) as credits
from event_logs
where dagster_event_type = 'STEP_START'
@cnolanminich
cnolanminich / dbt_sources_as_external_assets.py
Created January 16, 2025 16:59
example of building dbt external assets from
import json
import textwrap
from typing import Any, Mapping, List, Tuple
from dagster import (
AutomationCondition,
AssetKey,
BackfillPolicy,
DailyPartitionsDefinition,
job,
op,
@cnolanminich
cnolanminich / deployment
Created January 7, 2025 21:15
Dagster GitHub Action with releases for prod and dev for pushes to main
name: Dagster Cloud Hybrid Deployment
on:
push: # For full deployment
branches:
- "main"
- "master"
pull_request: # For branch deployments
types: [opened, synchronize, reopened, closed]
release:
types: [published]
@cnolanminich
cnolanminich / postgres_resource.py
Created December 11, 2024 20:39
example postgres resource
import pandas as pd
from dagster import asset, ResourceParam
from sqlalchemy import create_engine
# Define the PostgreSQL resource
class PostgresResource:
def __init__(self, connection_string):
self.connection_string = connection_string
def get_connection(self):
@cnolanminich
cnolanminich / ops_no_data.py
Created December 3, 2024 20:05
ops_no_data
import dagster as dg
@dg.op
def first_op(context: dg.OpExecutionContext) -> None:
context.log.info("Creating op one")
@dg.op(ins={"start": dg.In(dg.Nothing)})
def second_op(context: dg.OpExecutionContext) -> None:
context.log.info("Creating op two")
@cnolanminich
cnolanminich / dagster_with_sagemaker.py
Created November 27, 2024 20:08
example sagemaker pipeline
import boto3
import time
# Define an asset that uses the custom resource
@asset
def sagemaker_pipeline() -> str:
# note: the functions to connect and use Sagemaker would likely be put into a Dagster Resource
# this is for illustration purposes for how you might integrate a system like Sagemaker into Dagster as simply as possible
# Step 1: Set up the SageMaker session and client
@cnolanminich
cnolanminich / assets
Created November 8, 2024 17:21
querying assets and jobs
query GetAssetDetails($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
... on AssetNode {
assetKey {
path
}
dataVersion
description
groupName
hasAssetChecks
@cnolanminich
cnolanminich / step_functions.py
Created October 30, 2024 14:42
Dagster + AWS Step Functions Example
import boto3
import json
import time
from dagster import asset, AssetExecutionContext
@asset
def execute_step_function(context: AssetExecutionContext):
# Initialize the boto3 client for Step Functions
client = boto3.client('stepfunctions', region_name='us-west-2')