Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
cnolanminich / simple_databricks_asset.py
Created July 23, 2025 17:28
simple_databricks_asset
# hooli-ml/databricks_mlops/dagster_pipeline.py
import dagster as dg
from hooli_ml.defs.resources import get_env, MLWorkflowConfig, DatabricksResource
from databricks.sdk.service import jobs
NOTEBOOK_ROOT_PATH = "/Users/[email protected]/.bundle/databricks_mlops/dev/files/"
@dg.asset(kinds={"databricks", "feature_engineering"})
def feature_engineering_pickup(context: dg.AssetExecutionContext, databricks_resource: DatabricksResource):
run_id = databricks_resource.run_and_stream_notebook_logs_sdk(
@cnolanminich
cnolanminich / automation_asset.py
Created June 5, 2025 18:35
automation conditions that update once per period once all upstreams are materialized, and then again if any upstreams are materialized within that timeframe
# runs at 5 minutes past the hour for demonstration purposes.
daily_success_condition = (
dg.AutomationCondition.newly_updated()
.since(dg.AutomationCondition.on_cron("*/5 * * * *"))
)
custom_condition = (
dg.AutomationCondition.on_cron("*/5 * * * *") # Runs at 9 AM daily
| (
dg.AutomationCondition.any_deps_updated() # When any dependency updates
@cnolanminich
cnolanminich / historical_concurrency.sql
Last active May 21, 2025 12:20
Get historical concurrency for Dagster OSS instance
with seconds_in_day as (
select generate_series(0, 86400 - 1) as sec
)
, base_days as (
select generate_series(current_date - 7,current_date,'1 day'::interval) as days
),
days_and_seconds as (
select
base_days.days,
base_days.days + (seconds_in_day.sec || ' seconds')::interval as start_bucket,
@cnolanminich
cnolanminich / credits.sql
Created May 14, 2025 16:35
Dagster OSS Credit estimate
with events as (select distinct
DATE_FORMAT(timestamp, '%Y-%m') as event_month,
dagster_event_type,
coalesce(run_id, '||', step_key) as step_id,
count(1) as credits
from event_logs
where dagster_event_type = 'STEP_START'
@cnolanminich
cnolanminich / dbt_sources_as_external_assets.py
Created January 16, 2025 16:59
example of building dbt external assets from
import json
import textwrap
from typing import Any, Mapping, List, Tuple
from dagster import (
AutomationCondition,
AssetKey,
BackfillPolicy,
DailyPartitionsDefinition,
job,
op,
@cnolanminich
cnolanminich / deployment
Created January 7, 2025 21:15
Dagster GitHub Action with releases for prod and dev for pushes to main
name: Dagster Cloud Hybrid Deployment
on:
push: # For full deployment
branches:
- "main"
- "master"
pull_request: # For branch deployments
types: [opened, synchronize, reopened, closed]
release:
types: [published]
@cnolanminich
cnolanminich / postgres_resource.py
Created December 11, 2024 20:39
example postgres resource
import pandas as pd
from dagster import asset, ResourceParam
from sqlalchemy import create_engine
# Define the PostgreSQL resource
class PostgresResource:
def __init__(self, connection_string):
self.connection_string = connection_string
def get_connection(self):
@cnolanminich
cnolanminich / ops_no_data.py
Created December 3, 2024 20:05
ops_no_data
import dagster as dg
@dg.op
def first_op(context: dg.OpExecutionContext) -> None:
context.log.info("Creating op one")
@dg.op(ins={"start": dg.In(dg.Nothing)})
def second_op(context: dg.OpExecutionContext) -> None:
context.log.info("Creating op two")
@cnolanminich
cnolanminich / dagster_with_sagemaker.py
Created November 27, 2024 20:08
example sagemaker pipeline
import boto3
import time
# Define an asset that uses the custom resource
@asset
def sagemaker_pipeline() -> str:
# note: the functions to connect and use Sagemaker would likely be put into a Dagster Resource
# this is for illustration purposes for how you might integrate a system like Sagemaker into Dagster as simply as possible
# Step 1: Set up the SageMaker session and client
@cnolanminich
cnolanminich / assets
Created November 8, 2024 17:21
querying assets and jobs
query GetAssetDetails($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
... on AssetNode {
assetKey {
path
}
dataVersion
description
groupName
hasAssetChecks