Last active
November 11, 2022 18:27
-
-
Save slopp/354427513c907b7a7b3cb504a3f5a74c to your computer and use it in GitHub Desktop.
Example SQLAlchemy Dagster IO Manager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# --------------------------------------------------- | |
# Run this example with: | |
# dagit -f dagster_project.py | |
from dagster import asset, IOManager, with_resources, repository, io_manager, StringSource | |
import pandas as pd | |
import random | |
import sqlalchemy | |
import os | |
# --------------------------------------------------- | |
# IO Managers inherit the class IO Manager and | |
# implement load_input and handle_output | |
class DBIOManager(IOManager): | |
# we use the resource config sqlite_secret | |
# this pattern can be adapted for sqlalchemy | |
# databases that use other secrets when connecting | |
def __init__(self, sqlite_secret): | |
self._dbEngine = sqlalchemy.create_engine(f"sqlite:///{sqlite_secret}") | |
return | |
# handle_output takes an object (what is returned by the asset function) | |
# and serializes it | |
def handle_output(self, context, obj): | |
if not isinstance(obj, pd.DataFrame): | |
raise f'Tried to materialize an asset that is not a pandas dataframe' | |
# here we use the pandas sqlalchemy integration to | |
# write the dataframe to a table | |
obj.to_sql( | |
# the table will be named by the asset_key, in this case "orders" | |
# or "combo_orders" | |
name = context.asset_key.path[-1], | |
con = self._dbEngine, | |
index = False, | |
if_exists ='replace' | |
) | |
return | |
# load_input takes an asset_key and is responsible for making it | |
# available to an asset function | |
# | |
# in this example, load_input is called by combo_orders(context, orders) | |
# and is responsible for making orders available as a dataframe | |
def load_input(self, context) -> pd.DataFrame: | |
return pd.read_sql( | |
f'select * from {context.asset_key.path[-1]}', | |
self._dbEngine | |
) | |
# thie io_manager decorated function tells dagster | |
# to instatiate the io manager | |
# | |
# the config_schema defines the required resource | |
# configuration | |
# | |
# StringSource is a special type that works with | |
# secrets passed in from the environment | |
@io_manager( | |
config_schema={"SQLITE_SECRET": StringSource} | |
) | |
def db_io_manager(context): | |
sqlite_secret = context.resource_config["SQLITE_SECRET"] | |
return DBIOManager(sqlite_secret) | |
# --------------------------------------------------- | |
# Define assets | |
@asset | |
def orders(): | |
return pd.DataFrame({ | |
"order_id": range(10), | |
"customer_id":random.choices([111, 301, 42], k=10), | |
"product_id": random.choices(['widget', 'gizmo', 'gadget', 'combo'], k=10) | |
}) | |
@asset | |
def combo_orders(orders): | |
return orders[(orders['product_id'] == 'combo')] | |
# --------------------------------------------------- | |
# Build our repository | |
# this line ensures our toy project is self-contained | |
# but normally secrets would be set as environment | |
# variables outside of this file | |
os.environ["SQLITE_SECRET"] = "example.sqlite" | |
@repository | |
def my_dagster_project(): | |
return [ | |
# here we tell dagster to use our db_io_manager | |
# as the default "io_manager" for the two assets | |
with_resources( | |
[orders, combo_orders], | |
{ # our secret is added as configuration | |
# coming from an environment variable | |
"io_manager": db_io_manager.configured({ | |
"SQLITE_SECRET": {"env": "SQLITE_SECRET"} | |
}) | |
} | |
) | |
] | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dagster | |
dagit | |
sqlalchemy | |
pandas |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment