Skip to content

Instantly share code, notes, and snippets.

@rohithreddykota
Created January 31, 2025 18:23
Show Gist options
  • Save rohithreddykota/9ef44784c2869784c074a37b2989b8e7 to your computer and use it in GitHub Desktop.
Save rohithreddykota/9ef44784c2869784c074a37b2989b8e7 to your computer and use it in GitHub Desktop.

Serverless ETL: Load CSV from S3 into Aurora Using DuckDB in AWS Lambda

Introduction

Building a serverless ETL pipeline that efficiently loads structured data into Amazon Aurora is a common requirement. AWS Lambda, combined with DuckDB, provides a powerful way to perform in-memory analytics and bulk insert data into Aurora.

This guide demonstrates how to:

  • Read a CSV file from Amazon S3
  • Process the data in-memory using DuckDB
  • Securely connect to Aurora using IAM authentication
  • Perform efficient batch inserts into Aurora

This solution is optimized for performance, scalability, and security.


Prerequisites

AWS Services Required

  • Amazon S3: Stores the CSV file.
  • AWS Lambda: Executes the ETL logic.
  • Amazon Aurora (MySQL/PostgreSQL): Stores the transformed data.

Dependencies for AWS Lambda

Since AWS Lambda does not include DuckDB by default, it must be packaged as a Lambda layer.

To create the layer:

mkdir python
pip install duckdb -t python/
zip -r duckdb_layer.zip python

Upload duckdb_layer.zip as an AWS Lambda Layer and attach it to your Lambda function.


Step 1: Set IAM Permissions for Lambda

To allow Lambda to access S3 and Aurora, attach the following IAM policy to the Lambda function role:

{
  "Effect": "Allow",
  "Action": ["s3:GetObject", "rds-db:connect"],
  "Resource": [
    "arn:aws:s3:::my-bucket/*",
    "arn:aws:rds-db:us-east-1:123456789012:dbuser:mydb/iam_user"
  ]
}

Step 2: Lambda Function to Load CSV and Insert into Aurora

The following Python code for AWS Lambda reads a CSV file from S3, processes it using DuckDB, and bulk inserts it into Amazon Aurora.

Full Lambda Code

import boto3
import duckdb
import os

# AWS Configuration
S3_BUCKET = "my-bucket"
S3_FILE_KEY = "data.csv"
DB_HOST = "aurora-cluster.cluster-xyz.us-east-1.rds.amazonaws.com"
DB_NAME = "mydb"
DB_USER = "iam_user"
DB_PORT = 3306

# AWS Clients
s3_client = boto3.client("s3")
rds_client = boto3.client("rds")

def lambda_handler(event, context):
    try:
        # Generate IAM authentication token for Aurora
        token = rds_client.generate_db_auth_token(
            DBHostname=DB_HOST,
            Port=DB_PORT,
            DBUsername=DB_USER
        )

        # Download CSV file from S3
        file_path = f"/tmp/{S3_FILE_KEY}"
        s3_client.download_file(S3_BUCKET, S3_FILE_KEY, file_path)

        # Load CSV data into DuckDB
        con = duckdb.connect(database=":memory:")
        con.execute(f"CREATE TABLE temp AS SELECT * FROM read_csv_auto('{file_path}')")

        # Process data (example: filter active users)
        processed_data = con.execute("SELECT name, email FROM temp WHERE active=1")

        # Connect DuckDB to Aurora using MySQL Connector
        con.execute(f"""
            INSTALL mysql;
            LOAD mysql;
            ATTACH DATABASE 'mysql://{DB_USER}:{token}@{DB_HOST}:{DB_PORT}/{DB_NAME}' AS aurora;
        """)

        # Perform batch insert into Aurora
        con.execute("INSERT INTO aurora.users SELECT name, email FROM temp")

        return {"statusCode": 200, "body": f"Inserted {processed_data.row_count} rows into Aurora"}

    except Exception as e:
        return {"statusCode": 500, "body": str(e)}

Explanation of the Code

Step 1: Secure IAM Authentication

  • Uses generate_db_auth_token() to get a temporary authentication token for Aurora.
  • No hardcoded credentials, ensuring security.

Step 2: Fetch CSV from S3

  • Downloads the file from Amazon S3 into Lambda’s /tmp storage.

Step 3: Load CSV into DuckDB

  • Uses read_csv_auto() to load CSV directly into an in-memory DuckDB table.

Step 4: Process Data in DuckDB

  • Runs SQL transformations, such as filtering records.

Step 5: Connect to Aurora Using DuckDB’s MySQL Connector

  • Runs the following commands to establish a direct connection:
    INSTALL mysql;
    LOAD mysql;
    ATTACH DATABASE 'mysql://{DB_USER}:{token}@{DB_HOST}:{DB_PORT}/{DB_NAME}' AS aurora;

Step 6: Bulk Insert into Aurora

  • Uses a single query to insert all records:
    INSERT INTO aurora.users SELECT name, email FROM temp;
  • This method is more efficient than inserting row by row.

Performance Optimizations

Optimization Benefit
DuckDB in-memory processing Faster than using Pandas
Batch insert into Aurora Reduces network overhead
Using Parquet instead of CSV (optional) Faster and smaller file size
IAM authentication for Aurora No need for hardcoded passwords

Deployment Steps

1. Upload DuckDB with MySQL Connector as a Lambda Layer

mkdir python
pip install duckdb -t python/
zip -r duckdb_layer.zip python
  • Upload duckdb_layer.zip to AWS Lambda Layers.
  • Attach it to your Lambda function.

2. Set IAM Permissions

  • Apply the IAM policy for S3 and Aurora access as described in Step 1.

3. Adjust Lambda Memory & Timeout

  • Memory: At least 512MB for DuckDB processing.
  • Timeout: At least 30 seconds for handling large inserts.

Final Summary

Step Action
1 Fetch CSV from S3 and load into DuckDB
2 Process and filter data in DuckDB
3 Securely authenticate with Aurora using IAM
4 Bulk insert data into Aurora
5 Optimize performance using batch operations

This fully serverless approach provides a scalable, cost-effective, and secure ETL pipeline without managing infrastructure.


Conclusion

Using AWS Lambda with DuckDB provides a powerful and efficient way to process structured data and load it into Amazon Aurora. This solution eliminates the need for additional data processing infrastructure while maintaining high performance. It leverages IAM authentication for security and batch inserts for efficiency.

This approach is well-suited for use cases such as real-time analytics, serverless ETL, and log processing. It allows organizations to run complex transformations at scale without additional infrastructure costs.

Automating AWS Lambda, S3, and Aurora Setup with Terraform

This Terraform configuration will:

  • Create an S3 bucket for CSV storage
  • Deploy an AWS Lambda function with a DuckDB layer
  • Attach the necessary IAM permissions for S3 and Aurora
  • Set up Aurora Serverless as the database

1. Prerequisites

Before running Terraform, ensure you have:

  • Terraform installed (>= 1.0)
  • AWS CLI configured
  • An S3 bucket to store CSV files

2. Terraform Configuration

2.1 Provider Configuration

Create a file called provider.tf:

provider "aws" {
  region = "us-east-1"
}

2.2 Create an S3 Bucket

Create a file called s3.tf:

resource "aws_s3_bucket" "csv_bucket" {
  bucket = "my-duckdb-etl-bucket"
}

resource "aws_s3_bucket_public_access_block" "csv_bucket_block" {
  bucket                  = aws_s3_bucket.csv_bucket.id
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

2.3 Create an IAM Role for Lambda

Create a file called iam.tf:

resource "aws_iam_role" "lambda_role" {
  name = "lambda_duckdb_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
        Action = "sts:AssumeRole"
      }
    ]
  })
}

resource "aws_iam_policy" "lambda_s3_aurora" {
  name        = "lambda_s3_aurora_policy"
  description = "Policy to allow Lambda access to S3 and Aurora"
  policy      = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow"
        Action = ["s3:GetObject"],
        Resource = "arn:aws:s3:::my-duckdb-etl-bucket/*"
      },
      {
        Effect = "Allow"
        Action = "rds-db:connect",
        Resource = "arn:aws:rds-db:us-east-1:123456789012:dbuser:mydb/iam_user"
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ],
        Resource = "arn:aws:logs:us-east-1:123456789012:*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = aws_iam_policy.lambda_s3_aurora.arn
}

2.4 Create an Aurora Serverless Database

Create a file called aurora.tf:

resource "aws_rds_cluster" "aurora" {
  cluster_identifier      = "aurora-duckdb-cluster"
  engine                 = "aurora-mysql"
  engine_mode            = "serverless"
  database_name          = "mydb"
  master_username        = "admin"
  master_password        = "securepassword"
  backup_retention_period = 7
  storage_encrypted      = true
  skip_final_snapshot    = true
}

resource "aws_rds_cluster_instance" "aurora_instance" {
  cluster_identifier = aws_rds_cluster.aurora.id
  instance_class     = "db.serverless"
  engine            = aws_rds_cluster.aurora.engine
}

2.5 Create a Lambda Function

Create a file called lambda.tf:

resource "aws_lambda_function" "duckdb_lambda" {
  function_name    = "DuckDBETL"
  role            = aws_iam_role.lambda_role.arn
  handler         = "lambda_function.lambda_handler"
  runtime         = "python3.9"
  timeout         = 30
  memory_size     = 512

  filename         = "lambda.zip"
  source_code_hash = filebase64sha256("lambda.zip")

  environment {
    variables = {
      S3_BUCKET  = aws_s3_bucket.csv_bucket.id
      DB_HOST    = aws_rds_cluster.aurora.endpoint
      DB_NAME    = "mydb"
      DB_USER    = "admin"
      DB_PORT    = "3306"
    }
  }
}

resource "aws_lambda_permission" "allow_s3" {
  statement_id  = "AllowExecutionFromS3"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.duckdb_lambda.function_name
  principal     = "s3.amazonaws.com"
}

2.6 Deploy Terraform

  1. Initialize Terraform:
    terraform init
  2. Plan the deployment:
    terraform plan
  3. Apply the configuration:
    terraform apply --auto-approve

3. AWS Lambda Function Code

Save this as lambda_function.py:

import boto3
import duckdb
import os

# AWS Configuration
S3_BUCKET = os.getenv("S3_BUCKET")
DB_HOST = os.getenv("DB_HOST")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PORT = os.getenv("DB_PORT")

s3_client = boto3.client("s3")
rds_client = boto3.client("rds")

def lambda_handler(event, context):
    try:
        # Generate IAM authentication token for Aurora
        token = rds_client.generate_db_auth_token(
            DBHostname=DB_HOST,
            Port=int(DB_PORT),
            DBUsername=DB_USER
        )

        # Fetch CSV from S3
        file_path = "/tmp/data.csv"
        s3_client.download_file(S3_BUCKET, "data.csv", file_path)

        # Load CSV into DuckDB
        con = duckdb.connect(database=":memory:")
        con.execute(f"CREATE TABLE temp AS SELECT * FROM read_csv_auto('{file_path}')")

        # Process data
        processed_data = con.execute("SELECT name, email FROM temp WHERE active=1")

        # Connect to Aurora
        con.execute(f"""
            INSTALL mysql;
            LOAD mysql;
            ATTACH DATABASE 'mysql://{DB_USER}:{token}@{DB_HOST}:{DB_PORT}/{DB_NAME}' AS aurora;
        """)

        # Bulk insert into Aurora
        con.execute("INSERT INTO aurora.users SELECT name, email FROM temp")

        return {"statusCode": 200, "body": f"Inserted {processed_data.row_count} rows into Aurora"}

    except Exception as e:
        return {"statusCode": 500, "body": str(e)}

4. Package and Deploy Lambda Code

  1. Zip the Lambda function:
    zip lambda.zip lambda_function.py
  2. Upload and deploy using Terraform:
    terraform apply --auto-approve

5. Summary

Component Description
S3 Stores the CSV file
Lambda Reads CSV, processes with DuckDB, inserts into Aurora
Aurora Serverless Stores the processed data
IAM Role Grants Lambda access to S3 and Aurora
Terraform Automates the infrastructure deployment

This Terraform setup automates the entire pipeline from data ingestion to storage, ensuring a secure, scalable, and cost-effective ETL solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment