Skip to content

Instantly share code, notes, and snippets.

@slopp
Last active April 14, 2023 20:51
Show Gist options
  • Save slopp/500fba26b39e33265cfbc0dd1f60402c to your computer and use it in GitHub Desktop.
Save slopp/500fba26b39e33265cfbc0dd1f60402c to your computer and use it in GitHub Desktop.
Example of Fivetran and Downstream Assets
import json
from dagster import asset, AssetIn, AssetKey, Definitions, with_resources
from dagster_fivetran import build_fivetran_assets, fivetran_resource
from databricks_cli.sdk import JobService
from dagster_databricks import databricks_client
fivetran_instance = fivetran_resource.configured(
{
"api_key": {"env": "FIVETRAN_API_KEY"},
"api_secret": {"env": "FIVETRAN_API_SECRET"},
}
)
fivetran_assets = with_resources(
build_fivetran_assets(
connector_id="your_connector",
destination_tables=["fivetran_destination_table"],
),
{"fivetran": fivetran_instance},
)
databricks_client_instance = databricks_client.configured(
{
"host": {"env": "DATABRICKS_HOST"},
"token": {"env": "DATABRICKS_TOKEN"},
}
)
@asset(
non_argument_deps={"fivetran_destination_table"},
required_resource_keys={"databricks"}
)
def analytics_table():
""" Run a databricks job and poll for the results """
databricks = context.resources.databricks
jobs_service = JobsService(context.resources.databricks.api_client)
run_id: int = jobs_service.run_now(
job_id=your_databricks_job_id,
)["run_id"]
get_run_response: dict = jobs_service.get_run(run_id=run_id)
context.log.info(
f"Launched databricks job run for '{get_run_response['run_name']}' (`{run_id}`). URL:"
f" {get_run_response['run_page_url']}. Waiting to run to complete."
)
databricks.wait_for_run_to_complete(
logger=context.log,
databricks_run_id=run_id,
poll_interval_sec=10,
max_wait_time_sec=60*5
)
my_etl_job = define_asset_job(
"my_etl_job", AssetSelection.asset("analytics_table").upstream()
)
defs = Definitions(
assets=[fivetran_assets, analytics_table],
schedules=[
ScheduleDefinition(
job=my_etl_job,
cron_schedule="@daily",
)
],
resources={"databricks": databricks_client_instance}
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment