Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Last active July 26, 2024 14:01
Show Gist options
  • Save cnolanminich/478e36226b9944d3f7e93aaabc15b02c to your computer and use it in GitHub Desktop.
Save cnolanminich/478e36226b9944d3f7e93aaabc15b02c to your computer and use it in GitHub Desktop.
Example dlt loading from s3
from dagster import (
Definitions,
)
from .assets.dlt_assets import dagster_s3_assets
defs = Definitions(
assets=[dagster_s3_assets])
from dagster import AssetExecutionContext
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
import dlt
from dlt_sources.filesystem_pipeline import s3_locations_data
from pathlib import Path
import os
# import pipeline
dbt_project_path = Path(__file__).parent.parent.parent.joinpath("dbt_project")
DBT_PROJECT_DIR = os.fspath(dbt_project_path)
@dlt_assets(
dlt_source=s3_locations_data(
s3_bucket_name="hooli-demo",
s3_prefix="embedded-elt/",),
dlt_pipeline=dlt.pipeline(
pipeline_name="locations_data",
dataset_name="locations",
destination=dlt.destinations.duckdb(os.path.join(DBT_PROJECT_DIR, "example.duckdb")), #"duckdb",
progress="log",
),
name="locations",
group_name="dlt_testing",
)
def dagster_s3_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context)
# in dlt_sources
import os
import posixpath
from typing import Iterator, Sequence
import dlt
from dlt.sources import TDataItems, DltResource
import boto3
import duckdb
import pandas as pd
from io import StringIO
@dlt.source
def s3_locations_data(s3_bucket_name: str, s3_prefix: str) -> dlt.sources.DltResource:
"""
Reads CSV files from an S3 bucket and loads them into a Pandas DataFrame.
Args:
s3_bucket_name (str): The name of the S3 bucket.
s3_prefix (str): The prefix (path) to the CSV files in the S3 bucket.
Returns:
pd.DataFrame: The loaded Pandas DataFrame.
"""
@dlt.resource(primary_key="id", write_disposition="merge")
def locations_data():
s3 = boto3.client('s3')
# List CSV files in the S3 bucket
response = s3.list_objects_v2(Bucket=s3_bucket_name, Prefix=s3_prefix)
csv_files = [item['Key'] for item in response.get('Contents', []) if item['Key'].endswith('.csv')]
for csv_file in csv_files:
obj = s3.get_object(Bucket=s3_bucket_name, Key=csv_file)
csv_content = obj['Body'].read().decode('utf-8')
df = pd.read_csv(StringIO(csv_content))
yield df
return locations_data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment