Created
January 23, 2024 16:01
-
-
Save slopp/9cc7b6462b7aa958eb0687e0ef7c94c4 to your computer and use it in GitHub Desktop.
Dagster with a custom DSL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
profile: | |
name: FN | |
stocks_to_index: | |
- ticker: NFLX | |
- ticker: META | |
index_strategy: | |
type: equal | |
forecast: | |
days: 60 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
profile: | |
name: MAG | |
stocks_to_index: | |
- ticker: MSFT | |
- ticker: AAPL | |
- ticker: GOOG | |
index_strategy: | |
type: weighted_average | |
forecast: | |
days: 30 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shutil | |
from typing import Any, Dict, List, NamedTuple | |
import yaml | |
from dagster._core.execution.context.compute import AssetExecutionContext | |
try: | |
from yaml import CLoader as Loader | |
except ImportError: | |
from yaml import Loader | |
from dagster import AssetKey, AssetsDefinition, asset, file_relative_path, multi_asset, Definitions | |
from dagster._core.definitions.asset_spec import AssetSpec | |
from dagster._core.pipes.subprocess import PipesSubprocessClient | |
def load_yaml(relative_path: str) -> Dict[str, Any]: | |
path = os.path.join(os.path.dirname(__file__), relative_path) | |
with open(path, "r", encoding="utf8") as ff: | |
return yaml.load(ff, Loader=Loader) | |
def get_ticker_data(ticker: str) -> str: | |
# imagine instead of returning a string, this function fetches data from an external service | |
return f"{ticker}-data" | |
def enrich_and_insert_data(ticker_data) -> None: | |
# imagine this modifies the data and inserts it into our database | |
pass | |
def fetch_data_for_ticker(ticker: str) -> str: | |
# imagine this fetches data from our database | |
return f"{ticker}-data-enriched" | |
class StockInfo(NamedTuple): | |
ticker: str | |
class IndexStrategy(NamedTuple): | |
type: str | |
class Forecast(NamedTuple): | |
days: int | |
class Profile(NamedTuple): | |
name: str | |
class StockAssets(NamedTuple): | |
stock_infos: List[StockInfo] | |
index_strategy: IndexStrategy | |
forecast: Forecast | |
profile: Profile | |
def build_stock_assets_object(stocks_dsl_document: Dict[str, Dict]) -> StockAssets: | |
return StockAssets( | |
stock_infos=[ | |
StockInfo(ticker=stock_block["ticker"]) | |
for stock_block in stocks_dsl_document["stocks_to_index"] | |
], | |
index_strategy=IndexStrategy(type=stocks_dsl_document["index_strategy"]["type"]), | |
forecast=Forecast(int(stocks_dsl_document["forecast"]["days"])), | |
profile=Profile(stocks_dsl_document["profile"]["name"]) | |
) | |
def get_stocks_dsl_example_defs(yaml_path: str) -> List[AssetsDefinition]: | |
stocks_dsl_document = load_yaml(yaml_path) | |
stock_assets = build_stock_assets_object(stocks_dsl_document) | |
return assets_defs_from_stock_assets(stock_assets) | |
def assets_defs_from_stock_assets(stock_assets: StockAssets) -> List[AssetsDefinition]: | |
group_name = stock_assets.profile.name | |
def spec_for_stock_info(stock_info: StockInfo) -> AssetSpec: | |
ticker = stock_info.ticker | |
return AssetSpec( | |
key=AssetKey([ticker]), | |
group_name=group_name, | |
description=f"Fetch {ticker} from internal service", | |
) | |
tickers = [stock_info.ticker for stock_info in stock_assets.stock_infos] | |
ticker_specs = [spec_for_stock_info(stock_info) for stock_info in stock_assets.stock_infos] | |
forecast_days = stock_assets.forecast.days | |
index_strategy_type = stock_assets.index_strategy.type | |
@multi_asset( | |
specs=ticker_specs, | |
name=f"{group_name}_fetch_the_tickers", | |
) | |
def fetch_the_tickers(context: AssetExecutionContext): | |
# fetch the raw data | |
context.log.info(f"Got tickers: {tickers}") | |
@asset( | |
deps=fetch_the_tickers.keys, | |
group_name=group_name, | |
key_prefix=f"{group_name}" | |
) | |
def index_strategy(context: AssetExecutionContext) -> None: | |
stored_ticker_data = {} | |
for ticker in tickers: | |
stored_ticker_data[ticker] = fetch_data_for_ticker(ticker) | |
# do someting with stored_ticker_data | |
context.add_output_metadata({ | |
"index strategy": index_strategy_type | |
}) | |
@asset( | |
deps=index_strategy.key, | |
group_name=group_name, | |
key_prefix=f"{group_name}" | |
) | |
def forecast(context: AssetExecutionContext) -> None: | |
# do some forecast thing | |
context.add_output_metadata({ | |
"forecast days": forecast_days | |
}) | |
return [fetch_the_tickers, index_strategy, forecast] | |
defs = Definitions( | |
assets = [ | |
*get_stocks_dsl_example_defs("mag_profile.yaml"), | |
*get_stocks_dsl_example_defs("fn_profile.yaml") | |
] | |
) |
Author
slopp
commented
Jan 23, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment