Last active
July 26, 2024 14:01
-
-
Save cnolanminich/478e36226b9944d3f7e93aaabc15b02c to your computer and use it in GitHub Desktop.
Example dlt loading from s3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import ( | |
Definitions, | |
) | |
from .assets.dlt_assets import dagster_s3_assets | |
defs = Definitions( | |
assets=[dagster_s3_assets]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import AssetExecutionContext | |
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets | |
import dlt | |
from dlt_sources.filesystem_pipeline import s3_locations_data | |
from pathlib import Path | |
import os | |
# import pipeline | |
dbt_project_path = Path(__file__).parent.parent.parent.joinpath("dbt_project") | |
DBT_PROJECT_DIR = os.fspath(dbt_project_path) | |
@dlt_assets( | |
dlt_source=s3_locations_data( | |
s3_bucket_name="hooli-demo", | |
s3_prefix="embedded-elt/",), | |
dlt_pipeline=dlt.pipeline( | |
pipeline_name="locations_data", | |
dataset_name="locations", | |
destination=dlt.destinations.duckdb(os.path.join(DBT_PROJECT_DIR, "example.duckdb")), #"duckdb", | |
progress="log", | |
), | |
name="locations", | |
group_name="dlt_testing", | |
) | |
def dagster_s3_assets(context: AssetExecutionContext, dlt: DagsterDltResource): | |
yield from dlt.run(context=context) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# in dlt_sources | |
import os | |
import posixpath | |
from typing import Iterator, Sequence | |
import dlt | |
from dlt.sources import TDataItems, DltResource | |
import boto3 | |
import duckdb | |
import pandas as pd | |
from io import StringIO | |
@dlt.source | |
def s3_locations_data(s3_bucket_name: str, s3_prefix: str) -> dlt.sources.DltResource: | |
""" | |
Reads CSV files from an S3 bucket and loads them into a Pandas DataFrame. | |
Args: | |
s3_bucket_name (str): The name of the S3 bucket. | |
s3_prefix (str): The prefix (path) to the CSV files in the S3 bucket. | |
Returns: | |
pd.DataFrame: The loaded Pandas DataFrame. | |
""" | |
@dlt.resource(primary_key="id", write_disposition="merge") | |
def locations_data(): | |
s3 = boto3.client('s3') | |
# List CSV files in the S3 bucket | |
response = s3.list_objects_v2(Bucket=s3_bucket_name, Prefix=s3_prefix) | |
csv_files = [item['Key'] for item in response.get('Contents', []) if item['Key'].endswith('.csv')] | |
for csv_file in csv_files: | |
obj = s3.get_object(Bucket=s3_bucket_name, Key=csv_file) | |
csv_content = obj['Body'].read().decode('utf-8') | |
df = pd.read_csv(StringIO(csv_content)) | |
yield df | |
return locations_data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment