Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Last active July 26, 2024 16:32
Show Gist options
  • Save cnolanminich/f443e91f49f235b2e4cb0a57128c40a4 to your computer and use it in GitHub Desktop.
Save cnolanminich/f443e91f49f235b2e4cb0a57128c40a4 to your computer and use it in GitHub Desktop.
Use dlt open api codegen to create Dagster pipeline
LOCATIONS_DATA_2__SOURCES__GITHUB_FROM_OPENAPI__BASE_URL=https://api.github.com
LOCATIONS_DATA_2__SOURCES__GITHUB_FROM_OPENAPI__ACCESS_TOKEN={your_token_here}

Steps to reproduce:

  • uv venv' uv pip install dagster dagster-webserver dagster-embedded-elt "dlt[duckdb]" dlt-init-openapi
  • dagster scaffold --project_name dagster_dlt_with_open_api
  • mkdir dlt_sources; mkdir github_from_openapi
  • cd dlt_sources
  • dlt-init-openapi github_from_openapi --url https://raw.githubusercontent.com/github/rest-api-description/main/descriptions/api.github.com/api.github.com.json
  • find and replace the "FILL_ME_IN" with variables that make sense (e.g., dagster-io for org; dagster for "repo"
  • Note: this is a huge REST API and not all endpoints make sense to load. In this case I loaded all ~600 endpoints into Dagster to demonstrate but you would pick endpoints you want to load for a real use case
  • dagster dev using the following in your definitions.py file
  • To test run an asset like dlt_rest_api_resources_activitylist_stargazers_for_repo
from dagster import AssetExecutionContext
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dlt_sources.github_from_openapi_pipeline.github_from_openapi import github_from_openapi_source
import dlt
from dlt_sources.filesystem_pipeline import s3_locations_data
from dlt_sources.filesystem import readers
from pathlib import Path
import os
@dlt_assets(
dlt_source=github_from_openapi_source(),
dlt_pipeline=dlt.pipeline(
pipeline_name="github_from_openapi_spec",
dataset_name="github_from_openapi_spec",
destination=dlt.destinations.duckdb(os.path.join(DBT_PROJECT_DIR, "example.duckdb")), #"duckdb",
progress="log",
),
name="github_from_openapi_spec",
group_name="github_from_openapi_spec",
)
def github_from_open_api_spec_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context)
defs = Definitions(
assets=[github_from_open_api_spec_assets],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment