Skip to content

Instantly share code, notes, and snippets.

@slopp
Created November 29, 2022 18:53
Show Gist options
  • Save slopp/359ba324a4d6ccfb3bf23d0e9554c080 to your computer and use it in GitHub Desktop.
Save slopp/359ba324a4d6ccfb3bf23d0e9554c080 to your computer and use it in GitHub Desktop.
Example Dagster Project for Debugging
from dagster import asset, repository, with_resources
from resources import snow_api
import pandas as pd
@asset(
required_resource_keys = {"snow_api"}
)
def snow_forecast(context):
snow_api = context.resources.snow_api
snow_forecast = snow_api.get(location = "abasin")
return snow_forecast
@asset
def best_pow_day(context, snow_forecast):
snow_df = pd.DataFrame(snow_forecast)
snow_df['predictedSnowFall_24Hours'] = pd.to_numeric(snow_df['predictedSnowFall_24Hours'])
best_day = {
"day": snow_df[['predictedSnowFall_24Hours']].idxmax().values[0],
"snow": max(snow_df['predictedSnowFall_24Hours'])
}
context.log.info(f"Best Day: {best_day['day']}")
context.add_output_metadata(best_day)
return best_day
@repository
def snowy():
return [
with_resources(
[snow_forecast, best_pow_day],
{
"snow_api": snow_api
}
)
]
from dagster import resource
import pandas as pd
import requests
class SnowAPI():
def __init__(self) -> None:
self.__URL__ = 'http://feeds.snocountry.net/conditions.php?apiKey=SnoCountry.example&ids='
self.__skiareas__ = {
"abasin": "303001",
"eldora": "303011"
}
def get(self, location) -> pd.DataFrame:
resort_id = self.__skiareas__[location]
url = f"{self.__URL__}{resort_id}"
res = requests.get(url).json()
return res['items'][0]
@resource
def snow_api():
return SnowAPI()
from project import best_pow_day, snow_forecast
from dagster import build_op_context
from resources import snow_api
def test_snow():
context = build_op_context(resources={"snow_api": snow_api})
forecast = snow_forecast(context)
best_day = best_pow_day(context, forecast)
assert best_day
#test_snow()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment