Skip to content

Instantly share code, notes, and snippets.

@slopp
slopp / README.md
Last active December 29, 2022 00:15
[Draft] Dagster Robustness Guide

What is a robust data platform?

This guide centralizes concepts needed to run a "robust" production data platform using Dagster Cloud, where robust means assets and infrastructure are:

  • Fault Tolerant via replication, resource constraints, retries, parallelization, and run queues and priorities
  • Observable via customizable logs and useful alerts

This guide does not cover every aspect of a production data platform. Other useful resources include:

  • Testing and CICD to ensure new code does what is expected without breaking existing assets
  • Project Structure to build a code base that can scale across teams and dependencies [Todo: Link to guide]
  • Data Expectations to ensure the data flowing through your pipelines is valid and meets your expectations [Todo: refresh guide for assets, add section on conditional behavior]
@slopp
slopp / README.md
Last active December 19, 2022 16:55
Diff Eqs in Dagster

Self-Dependent Asset Partitions

As of 1.1.7, Dagster supports assets that rely on prior versions of themselves, for example, an asset that implements a differential equation.

Getting Started

To run this example, first install the dependencies:

pip install dagster, dagit
@slopp
slopp / project.py
Created November 29, 2022 18:53
Example Dagster Project for Debugging
from dagster import asset, repository, with_resources
from resources import snow_api
import pandas as pd
@asset(
required_resource_keys = {"snow_api"}
)
def snow_forecast(context):
snow_api = context.resources.snow_api
snow_forecast = snow_api.get(location = "abasin")
@slopp
slopp / snowflake_oauth.py
Created November 28, 2022 22:18
Snowflake Resource with OAuth
from dagster_snowflake import SnowflakeConnection
from typing import Mapping
from dagster._annotations import public
from dagster import resource, StringSource
class SnowflakeOAuthConnection(SnowflakeConnection):
def __init__(self, config: Mapping[str, str], log):
@slopp
slopp / repo.py
Last active November 28, 2022 18:55
Snowflake and Source Assets
from dagster import SourceAsset, asset, repository, AssetKey, AssetIn, with_resources, define_asset_job, AssetSelection, DailyPartitionsDefinition
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
import pandas as pd
# Use a SourceAsset to reference an existing table
# snowflake usually needs a schema, so we
# specify it via the AssetKey as ["schema", "table"]
# We tell dagster that this table is partitioned by day
orders = SourceAsset(
@slopp
slopp / dagster_project.py
Last active November 11, 2022 18:27
Example SQLAlchemy Dagster IO Manager
# ---------------------------------------------------
# Run this example with:
# dagit -f dagster_project.py
from dagster import asset, IOManager, with_resources, repository, io_manager, StringSource
import pandas as pd
import random
import sqlalchemy
import os
@slopp
slopp / ecs-agent-vpc.yaml
Created November 7, 2022 14:41
Dagster Cloud ECS Template
AWSTemplateFormatVersion: 2010-09-09
Description: >
Deploys a Dagster Cloud user agent to a new or existing ECS cluster in a new VPC.
Version DAGSTER_VERSION.
See https://docs.dagster.cloud/agents/ecs/setup for more details.
Outputs:
TemplateVersion:
Description: The revision of the ECS agent template.
@slopp
slopp / repository.py
Created November 2, 2022 19:52
Assets vs Ops
# repository.py
from dagster import job, ScheduleDefinition, repository, with_resources
from dagster_fivetran import fivetran_resource, fivetran_sync_op
from dagster_dbt import dbt_cloud_resource, dbt_cloud_run_op
my_fivetran_resource = fivetran_resource.configured(
{
"api_key": {"env": "FIVETRAN_API_KEY"},
"api_secret": {"env": "FIVETRAN_API_SECRET"},
@slopp
slopp / assets.py
Last active November 8, 2022 19:10
Modern Data Stack Ops vs Assets
# repository.py
from dagster import job, ScheduleDefinition, repository, with_resources
from dagster_fivetran import fivetran_resource, build_fivetran_assets
from dagster_dbt import dbt_cloud_resource, load_assets_from_dbt_cloud_manifest
my_fivetran_resource = fivetran_resource.configured(
{
"api_key": {"env": "FIVETRAN_API_KEY"},
@slopp
slopp / HotTakes.md
Last active June 20, 2023 21:49
Dagster Hot Takes

Dagster Hot Takes

Less On-Call Pages: Retries and Alerts

https://youtu.be/A6WtkMwe4VQ

Getting an on-call page is the worst. Unfortunately most task-based orchestrators page teams frequently, whenever jobs fail. With Dagster you can reduce this alert fatigue by using retry strategies and only getting notified when SLAs are violated.

Resources: