Skip to content

Instantly share code, notes, and snippets.

@slopp
Last active November 11, 2022 18:27
Show Gist options
  • Save slopp/354427513c907b7a7b3cb504a3f5a74c to your computer and use it in GitHub Desktop.
Save slopp/354427513c907b7a7b3cb504a3f5a74c to your computer and use it in GitHub Desktop.
Example SQLAlchemy Dagster IO Manager
# ---------------------------------------------------
# Run this example with:
# dagit -f dagster_project.py
from dagster import asset, IOManager, with_resources, repository, io_manager, StringSource
import pandas as pd
import random
import sqlalchemy
import os
# ---------------------------------------------------
# IO Managers inherit the class IO Manager and
# implement load_input and handle_output
class DBIOManager(IOManager):
# we use the resource config sqlite_secret
# this pattern can be adapted for sqlalchemy
# databases that use other secrets when connecting
def __init__(self, sqlite_secret):
self._dbEngine = sqlalchemy.create_engine(f"sqlite:///{sqlite_secret}")
return
# handle_output takes an object (what is returned by the asset function)
# and serializes it
def handle_output(self, context, obj):
if not isinstance(obj, pd.DataFrame):
raise f'Tried to materialize an asset that is not a pandas dataframe'
# here we use the pandas sqlalchemy integration to
# write the dataframe to a table
obj.to_sql(
# the table will be named by the asset_key, in this case "orders"
# or "combo_orders"
name = context.asset_key.path[-1],
con = self._dbEngine,
index = False,
if_exists ='replace'
)
return
# load_input takes an asset_key and is responsible for making it
# available to an asset function
#
# in this example, load_input is called by combo_orders(context, orders)
# and is responsible for making orders available as a dataframe
def load_input(self, context) -> pd.DataFrame:
return pd.read_sql(
f'select * from {context.asset_key.path[-1]}',
self._dbEngine
)
# thie io_manager decorated function tells dagster
# to instatiate the io manager
#
# the config_schema defines the required resource
# configuration
#
# StringSource is a special type that works with
# secrets passed in from the environment
@io_manager(
config_schema={"SQLITE_SECRET": StringSource}
)
def db_io_manager(context):
sqlite_secret = context.resource_config["SQLITE_SECRET"]
return DBIOManager(sqlite_secret)
# ---------------------------------------------------
# Define assets
@asset
def orders():
return pd.DataFrame({
"order_id": range(10),
"customer_id":random.choices([111, 301, 42], k=10),
"product_id": random.choices(['widget', 'gizmo', 'gadget', 'combo'], k=10)
})
@asset
def combo_orders(orders):
return orders[(orders['product_id'] == 'combo')]
# ---------------------------------------------------
# Build our repository
# this line ensures our toy project is self-contained
# but normally secrets would be set as environment
# variables outside of this file
os.environ["SQLITE_SECRET"] = "example.sqlite"
@repository
def my_dagster_project():
return [
# here we tell dagster to use our db_io_manager
# as the default "io_manager" for the two assets
with_resources(
[orders, combo_orders],
{ # our secret is added as configuration
# coming from an environment variable
"io_manager": db_io_manager.configured({
"SQLITE_SECRET": {"env": "SQLITE_SECRET"}
})
}
)
]
dagster
dagit
sqlalchemy
pandas
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment