Skip to content

Instantly share code, notes, and snippets.

@slopp
slopp / README.md
Last active October 12, 2022 15:16
Airflow Usage Minutes

Dagster Cloud pricing is based on usage. If you are using Airflow, you can get an estimate of this usage by querying the Airflow metadata database.

For SQLite:

# navigate to your airflow DB, normally ~/airflow
sqlite3
.open airflowdb
SELECT SUM((julianday(end_date) - julianday(start_date))* 24 * 60) AS run_minutes FROM dag_run;
@slopp
slopp / HotTakes.md
Last active June 20, 2023 21:49
Dagster Hot Takes

Dagster Hot Takes

Less On-Call Pages: Retries and Alerts

https://youtu.be/A6WtkMwe4VQ

Getting an on-call page is the worst. Unfortunately most task-based orchestrators page teams frequently, whenever jobs fail. With Dagster you can reduce this alert fatigue by using retry strategies and only getting notified when SLAs are violated.

Resources:

@slopp
slopp / assets.py
Last active November 8, 2022 19:10
Modern Data Stack Ops vs Assets
# repository.py
from dagster import job, ScheduleDefinition, repository, with_resources
from dagster_fivetran import fivetran_resource, build_fivetran_assets
from dagster_dbt import dbt_cloud_resource, load_assets_from_dbt_cloud_manifest
my_fivetran_resource = fivetran_resource.configured(
{
"api_key": {"env": "FIVETRAN_API_KEY"},
@slopp
slopp / repository.py
Created November 2, 2022 19:52
Assets vs Ops
# repository.py
from dagster import job, ScheduleDefinition, repository, with_resources
from dagster_fivetran import fivetran_resource, fivetran_sync_op
from dagster_dbt import dbt_cloud_resource, dbt_cloud_run_op
my_fivetran_resource = fivetran_resource.configured(
{
"api_key": {"env": "FIVETRAN_API_KEY"},
"api_secret": {"env": "FIVETRAN_API_SECRET"},
@slopp
slopp / ecs-agent-vpc.yaml
Created November 7, 2022 14:41
Dagster Cloud ECS Template
AWSTemplateFormatVersion: 2010-09-09
Description: >
Deploys a Dagster Cloud user agent to a new or existing ECS cluster in a new VPC.
Version DAGSTER_VERSION.
See https://docs.dagster.cloud/agents/ecs/setup for more details.
Outputs:
TemplateVersion:
Description: The revision of the ECS agent template.
@slopp
slopp / dagster_project.py
Last active November 11, 2022 18:27
Example SQLAlchemy Dagster IO Manager
# ---------------------------------------------------
# Run this example with:
# dagit -f dagster_project.py
from dagster import asset, IOManager, with_resources, repository, io_manager, StringSource
import pandas as pd
import random
import sqlalchemy
import os
@slopp
slopp / repo.py
Last active November 28, 2022 18:55
Snowflake and Source Assets
from dagster import SourceAsset, asset, repository, AssetKey, AssetIn, with_resources, define_asset_job, AssetSelection, DailyPartitionsDefinition
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
import pandas as pd
# Use a SourceAsset to reference an existing table
# snowflake usually needs a schema, so we
# specify it via the AssetKey as ["schema", "table"]
# We tell dagster that this table is partitioned by day
orders = SourceAsset(
@slopp
slopp / snowflake_oauth.py
Created November 28, 2022 22:18
Snowflake Resource with OAuth
from dagster_snowflake import SnowflakeConnection
from typing import Mapping
from dagster._annotations import public
from dagster import resource, StringSource
class SnowflakeOAuthConnection(SnowflakeConnection):
def __init__(self, config: Mapping[str, str], log):
@slopp
slopp / project.py
Created November 29, 2022 18:53
Example Dagster Project for Debugging
from dagster import asset, repository, with_resources
from resources import snow_api
import pandas as pd
@asset(
required_resource_keys = {"snow_api"}
)
def snow_forecast(context):
snow_api = context.resources.snow_api
snow_forecast = snow_api.get(location = "abasin")
@slopp
slopp / README.md
Last active December 19, 2022 16:55
Diff Eqs in Dagster

Self-Dependent Asset Partitions

As of 1.1.7, Dagster supports assets that rely on prior versions of themselves, for example, an asset that implements a differential equation.

Getting Started

To run this example, first install the dependencies:

pip install dagster, dagit