Skip to content

Instantly share code, notes, and snippets.

@dwinston
Created June 23, 2021 14:45
Show Gist options
  • Save dwinston/ebe4ab0c36a9d646c3cd8a6c77eb6644 to your computer and use it in GitHub Desktop.
Save dwinston/ebe4ab0c36a9d646c3cd8a6c77eb6644 to your computer and use it in GitHub Desktop.
dagster resource in a sensor via preset definition run config
from dagster import (
ModeDefinition, PresetDefinition, resource, StringSource,
build_init_resource_context, RunRequest, sensor,
)
class ApiClient:
def __init__(self, base_url: str, site_id: str, client_id: str, client_secret: str):
self.base_url = base_url
self.site_id = site_id
self.client_id = client_id
self.client_secret = client_secret
def ok(self) -> bool:
# Do awesome resource stuff here
pass
@resource(
config_schema={
"base_url": StringSource,
"site_id": StringSource,
"client_id": StringSource,
"client_secret": StringSource,
}
)
def api_resource(context):
return ApiClient(
base_url=context.resource_config["base_url"],
site_id=context.resource_config["site_id"],
client_id=context.resource_config["client_id"],
client_secret=context.resource_config["client_secret"],
)
mode_normal = ModeDefinition(
name="normal",
resource_defs={
"api": api_resource,
},
)
preset_normal_env = PresetDefinition(
"normal_via_env",
run_config={
"resources": {
"api": {
"config": {
"base_url": {"env": "API_HOST"},
"site_id": {"env": "API_SITE_ID"},
"client_id": {"env": "API_SITE_CLIENT_ID"},
"client_secret": {"env": "API_SITE_CLIENT_SECRET"},
},
},
},
},
mode="normal",
)
@sensor(pipeline_name="my_pipeline", mode="normal")
def my_sensor(_context):
resource_context = build_init_resource_context(
config=preset_normal_env.run_config["resources"]["api"]["config"]
)
client = api_resource(resource_context)
should_run = client.ok()
if should_run:
yield RunRequest(run_key=None, run_config=preset_normal_env.run_config)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment