This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# runs at 5 minutes past the hour for demonstration purposes. | |
daily_success_condition = ( | |
dg.AutomationCondition.newly_updated() | |
.since(dg.AutomationCondition.on_cron("*/5 * * * *")) | |
) | |
custom_condition = ( | |
dg.AutomationCondition.on_cron("*/5 * * * *") # Runs at 9 AM daily | |
| ( | |
dg.AutomationCondition.any_deps_updated() # When any dependency updates |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with seconds_in_day as ( | |
select generate_series(0, 86400 - 1) as sec | |
) | |
, base_days as ( | |
select generate_series(current_date - 7,current_date,'1 day'::interval) as days | |
), | |
days_and_seconds as ( | |
select | |
base_days.days, | |
base_days.days + (seconds_in_day.sec || ' seconds')::interval as start_bucket, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with events as (select distinct | |
DATE_FORMAT(timestamp, '%Y-%m') as event_month, | |
dagster_event_type, | |
coalesce(run_id, '||', step_key) as step_id, | |
count(1) as credits | |
from event_logs | |
where dagster_event_type = 'STEP_START' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import textwrap | |
from typing import Any, Mapping, List, Tuple | |
from dagster import ( | |
AutomationCondition, | |
AssetKey, | |
BackfillPolicy, | |
DailyPartitionsDefinition, | |
job, | |
op, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: Dagster Cloud Hybrid Deployment | |
on: | |
push: # For full deployment | |
branches: | |
- "main" | |
- "master" | |
pull_request: # For branch deployments | |
types: [opened, synchronize, reopened, closed] | |
release: | |
types: [published] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from dagster import asset, ResourceParam | |
from sqlalchemy import create_engine | |
# Define the PostgreSQL resource | |
class PostgresResource: | |
def __init__(self, connection_string): | |
self.connection_string = connection_string | |
def get_connection(self): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dagster as dg | |
@dg.op | |
def first_op(context: dg.OpExecutionContext) -> None: | |
context.log.info("Creating op one") | |
@dg.op(ins={"start": dg.In(dg.Nothing)}) | |
def second_op(context: dg.OpExecutionContext) -> None: | |
context.log.info("Creating op two") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import time | |
# Define an asset that uses the custom resource | |
@asset | |
def sagemaker_pipeline() -> str: | |
# note: the functions to connect and use Sagemaker would likely be put into a Dagster Resource | |
# this is for illustration purposes for how you might integrate a system like Sagemaker into Dagster as simply as possible | |
# Step 1: Set up the SageMaker session and client |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
query GetAssetDetails($assetKey: AssetKeyInput!) { | |
assetNodeOrError(assetKey: $assetKey) { | |
... on AssetNode { | |
assetKey { | |
path | |
} | |
dataVersion | |
description | |
groupName | |
hasAssetChecks |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import time | |
from dagster import asset, AssetExecutionContext | |
@asset | |
def execute_step_function(context: AssetExecutionContext): | |
# Initialize the boto3 client for Step Functions | |
client = boto3.client('stepfunctions', region_name='us-west-2') |
NewerOlder