Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
cnolanminich / assets
Created November 8, 2024 17:21
querying assets and jobs
query GetAssetDetails($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
... on AssetNode {
assetKey {
path
}
dataVersion
description
groupName
hasAssetChecks
@cnolanminich
cnolanminich / step_functions.py
Created October 30, 2024 14:42
Dagster + AWS Step Functions Example
import boto3
import json
import time
from dagster import asset, AssetExecutionContext
@asset
def execute_step_function(context: AssetExecutionContext):
# Initialize the boto3 client for Step Functions
client = boto3.client('stepfunctions', region_name='us-west-2')
@cnolanminich
cnolanminich / definitions.py
Created October 15, 2024 20:31
turn on or off schedules in bulk, or have schedules that expire
from dagster import Definitions, load_assets_from_modules, AssetSelection, define_asset_job, op, OpExecutionContext, job, asset, asset_check, ScheduleDefinition
import requests
from datetime import datetime
# can be modified to work with auth in dagster+
graphql_endpoint = "http://localhost:3000/graphql" # Adjust the URL as needed
code_location_name = "turn_on_schedules_in_bulk" # Adjust the code location name as needed
# utility function to get all schedules for a code location
@cnolanminich
cnolanminich / run_status_sensors.py
Last active October 4, 2024 20:43
run status sensor that only runs once both upstream jobs complete
from dagster import run_status_sensor, DagsterRunStatus, RunRequest, SkipReason, job, sensor, define_asset_job, SensorEvaluationContext, DagsterInstance, RunsFilter, Definitions, asset, AssetExecutionContext
from datetime import datetime, timedelta
import json
@asset
def first_asset(context: AssetExecutionContext) -> None:
context.log.info("First asset")
@asset
def second_asset(context: AssetExecutionContext) -> None:
@cnolanminich
cnolanminich / jenkins.file
Last active September 13, 2024 16:29
Jenkinsfile for hybrid
pipeline {
agent any
environment {
// Construct the IMAGE_TAG using Jenkins environment variables
IMAGE_TAG = "${env.GIT_COMMIT}-${env.BUILD_ID}
AWS_ACCESS_KEY_ID = credentials('aws-access-key-id') // Reference to the AWS access key ID secret
AWS_SECRET_ACCESS_KEY = credentials('aws-secret-access-key') // Reference to the AWS secret access key secret
AWS_REGION = 'us-west-2' // Set your AWS region
}
@cnolanminich
cnolanminich / conditional_schedule.py
Created September 11, 2024 02:50
Schedule that only runs if the upstream asset is fresh
import dagster as dg
from datetime import timedelta
# Upstream asset that should be fresh
@dg.asset
def upstream_asset(context: dg.AssetExecutionContext) -> None:
context.log.info("Upstream asset is being computed")
@dg.asset
def downstream_asset(context: dg.AssetExecutionContext) -> None:
@cnolanminich
cnolanminich / snowflake_sensor.py
Created August 23, 2024 21:13
sensor that checks multiple snowflake tables and ensures they're all
from dagster import (
sensor,
EventLogEntry,
RunRequest,
SensorEvaluationContext,
SkipReason,
)
from datetime import datetime
# this would be your job
@cnolanminich
cnolanminich / dlt_assets.py
Created July 26, 2024 16:34
example using dlt to ingest s3
# after running dlt init filesystem duckdb
from dagster import AssetExecutionContext
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
import dlt
from dlt_sources.filesystem_pipeline import s3_locations_data
from dlt_sources.filesystem import readers
from pathlib import Path
import os
@cnolanminich
cnolanminich / .env
Last active July 26, 2024 16:32
Use dlt open api codegen to create Dagster pipeline
LOCATIONS_DATA_2__SOURCES__GITHUB_FROM_OPENAPI__BASE_URL=https://api.github.com
LOCATIONS_DATA_2__SOURCES__GITHUB_FROM_OPENAPI__ACCESS_TOKEN={your_token_here}
@cnolanminich
cnolanminich / definitions.py
Last active July 26, 2024 14:01
Example dlt loading from s3
from dagster import (
Definitions,
)
from .assets.dlt_assets import dagster_s3_assets
defs = Definitions(
assets=[dagster_s3_assets])