Skip to content

Instantly share code, notes, and snippets.

@jiangzhuo
Created August 25, 2025 19:02
Show Gist options
  • Save jiangzhuo/84fb76f2430aff1731ac9ca4100aa93c to your computer and use it in GitHub Desktop.
Save jiangzhuo/84fb76f2430aff1731ac9ca4100aa93c to your computer and use it in GitHub Desktop.
duckdb-lambda-2025
-- DuckDB on Lambda: Common Query Patterns and Use Cases
-- These queries demonstrate the power and flexibility of DuckDB for serverless analytics
-- ============================================================================
-- BASIC DATA INGESTION
-- ============================================================================
-- 1. Read Parquet files from S3
SELECT *
FROM read_parquet('s3://your-bucket/data/sales_2025.parquet')
LIMIT 10;
-- 2. Read multiple Parquet files with glob pattern
SELECT COUNT(*) as total_records,
MIN(date) as earliest_date,
MAX(date) as latest_date
FROM read_parquet('s3://your-bucket/data/sales_*.parquet');
-- 3. Read CSV with automatic schema detection
SELECT *
FROM read_csv_auto('s3://your-bucket/raw/customers.csv', header=true)
LIMIT 5;
-- 4. Read JSON data
SELECT *
FROM read_json_auto('s3://your-bucket/events/user_events.json');
-- 5. Query data directly from HTTP endpoints
SELECT *
FROM read_parquet('https://datasets.example.com/public/dataset.parquet')
WHERE year = 2025;
-- ============================================================================
-- ANALYTICAL QUERIES
-- ============================================================================
-- 6. Daily Active Users (DAU) calculation
SELECT
DATE(event_timestamp) as day,
COUNT(DISTINCT user_id) as daily_active_users,
COUNT(*) as total_events
FROM read_parquet('s3://your-bucket/events/app_events_*.parquet')
WHERE event_timestamp >= CURRENT_DATE - INTERVAL 7 DAY
GROUP BY DATE(event_timestamp)
ORDER BY day DESC;
-- 7. Revenue analysis with window functions
SELECT
product_category,
sale_date,
daily_revenue,
SUM(daily_revenue) OVER (
PARTITION BY product_category
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as rolling_7day_revenue,
AVG(daily_revenue) OVER (
PARTITION BY product_category
ORDER BY sale_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) as rolling_30day_avg
FROM (
SELECT
product_category,
DATE(order_timestamp) as sale_date,
SUM(amount) as daily_revenue
FROM read_parquet('s3://your-bucket/sales/*.parquet')
WHERE order_timestamp >= CURRENT_DATE - INTERVAL 90 DAY
GROUP BY product_category, DATE(order_timestamp)
);
-- 8. Customer cohort analysis
WITH cohorts AS (
SELECT
user_id,
DATE_TRUNC('month', MIN(order_date)) as cohort_month
FROM read_parquet('s3://your-bucket/orders/*.parquet')
GROUP BY user_id
)
SELECT
cohort_month,
DATE_TRUNC('month', o.order_date) as order_month,
COUNT(DISTINCT c.user_id) as active_users,
SUM(o.amount) as total_revenue,
AVG(o.amount) as avg_order_value
FROM cohorts c
JOIN read_parquet('s3://your-bucket/orders/*.parquet') o
ON c.user_id = o.user_id
GROUP BY cohort_month, DATE_TRUNC('month', o.order_date)
ORDER BY cohort_month, order_month;
-- 9. Funnel analysis
WITH funnel_events AS (
SELECT
session_id,
MAX(CASE WHEN event_name = 'page_view' THEN 1 ELSE 0 END) as viewed,
MAX(CASE WHEN event_name = 'add_to_cart' THEN 1 ELSE 0 END) as added_to_cart,
MAX(CASE WHEN event_name = 'checkout' THEN 1 ELSE 0 END) as checked_out,
MAX(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) as purchased
FROM read_parquet('s3://your-bucket/events/web_events_*.parquet')
WHERE event_date = CURRENT_DATE - 1
GROUP BY session_id
)
SELECT
SUM(viewed) as page_views,
SUM(added_to_cart) as cart_adds,
SUM(checked_out) as checkouts,
SUM(purchased) as purchases,
ROUND(100.0 * SUM(added_to_cart) / NULLIF(SUM(viewed), 0), 2) as view_to_cart_rate,
ROUND(100.0 * SUM(checked_out) / NULLIF(SUM(added_to_cart), 0), 2) as cart_to_checkout_rate,
ROUND(100.0 * SUM(purchased) / NULLIF(SUM(checked_out), 0), 2) as checkout_to_purchase_rate
FROM funnel_events;
-- ============================================================================
-- ETL AND DATA TRANSFORMATION
-- ============================================================================
-- 10. Transform and aggregate data for reporting
COPY (
SELECT
DATE_TRUNC('hour', timestamp) as hour,
region,
COUNT(*) as request_count,
AVG(response_time_ms) as avg_response_time,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY response_time_ms) as p50_response_time,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time_ms) as p95_response_time,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY response_time_ms) as p99_response_time
FROM read_parquet('s3://your-bucket/logs/api_logs_*.parquet')
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL 24 HOUR
GROUP BY DATE_TRUNC('hour', timestamp), region
) TO 's3://your-bucket/reports/hourly_api_metrics.parquet' (FORMAT PARQUET, COMPRESSION ZSTD);
-- 11. Data deduplication
COPY (
SELECT DISTINCT ON (user_id, event_id) *
FROM read_parquet('s3://your-bucket/raw/events_*.parquet')
ORDER BY user_id, event_id, ingested_at DESC
) TO 's3://your-bucket/cleaned/deduplicated_events.parquet' (FORMAT PARQUET);
-- 12. Join and enrich data from multiple sources
COPY (
SELECT
o.*,
c.customer_name,
c.customer_segment,
c.lifetime_value,
p.product_name,
p.product_category,
p.unit_cost
FROM read_parquet('s3://your-bucket/orders/*.parquet') o
LEFT JOIN read_parquet('s3://your-bucket/customers/current.parquet') c
ON o.customer_id = c.customer_id
LEFT JOIN read_csv_auto('https://api.example.com/products.csv') p
ON o.product_id = p.product_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL 30 DAY
) TO 's3://your-bucket/enriched/orders_enriched.parquet' (FORMAT PARQUET);
-- ============================================================================
-- ADVANCED ANALYTICS
-- ============================================================================
-- 13. Time series decomposition with moving averages
WITH time_series AS (
SELECT
DATE(timestamp) as date,
SUM(value) as daily_value
FROM read_parquet('s3://your-bucket/metrics/*.parquet')
WHERE timestamp >= CURRENT_DATE - INTERVAL 365 DAY
GROUP BY DATE(timestamp)
),
moving_averages AS (
SELECT
date,
daily_value,
AVG(daily_value) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as ma7,
AVG(daily_value) OVER (ORDER BY date ROWS BETWEEN 29 PRECEDING AND CURRENT ROW) as ma30
FROM time_series
)
SELECT
date,
daily_value,
ma7,
ma30,
daily_value - ma7 as deviation_from_ma7,
CASE
WHEN daily_value > ma30 * 1.2 THEN 'spike'
WHEN daily_value < ma30 * 0.8 THEN 'drop'
ELSE 'normal'
END as anomaly_flag
FROM moving_averages
ORDER BY date DESC;
-- 14. User retention analysis
WITH user_activity AS (
SELECT
user_id,
DATE(first_seen) as signup_date,
DATE(last_seen) as last_active_date
FROM (
SELECT
user_id,
MIN(event_timestamp) as first_seen,
MAX(event_timestamp) as last_seen
FROM read_parquet('s3://your-bucket/events/user_events_*.parquet')
GROUP BY user_id
)
),
retention_cohorts AS (
SELECT
DATE_TRUNC('week', signup_date) as cohort_week,
COUNT(DISTINCT user_id) as cohort_size,
COUNT(DISTINCT CASE
WHEN last_active_date >= signup_date + INTERVAL 7 DAY THEN user_id
END) as week_1_retained,
COUNT(DISTINCT CASE
WHEN last_active_date >= signup_date + INTERVAL 30 DAY THEN user_id
END) as day_30_retained,
COUNT(DISTINCT CASE
WHEN last_active_date >= signup_date + INTERVAL 90 DAY THEN user_id
END) as day_90_retained
FROM user_activity
WHERE signup_date >= CURRENT_DATE - INTERVAL 180 DAY
GROUP BY DATE_TRUNC('week', signup_date)
)
SELECT
cohort_week,
cohort_size,
ROUND(100.0 * week_1_retained / cohort_size, 2) as week_1_retention_rate,
ROUND(100.0 * day_30_retained / cohort_size, 2) as day_30_retention_rate,
ROUND(100.0 * day_90_retained / cohort_size, 2) as day_90_retention_rate
FROM retention_cohorts
ORDER BY cohort_week DESC;
-- 15. Pivot table for cross-tabulation analysis
PIVOT (
SELECT
product_category,
DATE_TRUNC('month', order_date) as month,
SUM(revenue) as total_revenue
FROM read_parquet('s3://your-bucket/sales/*.parquet')
WHERE order_date >= CURRENT_DATE - INTERVAL 12 MONTH
GROUP BY product_category, DATE_TRUNC('month', order_date)
) ON month USING SUM(total_revenue)
GROUP BY product_category
ORDER BY product_category;
-- ============================================================================
-- PERFORMANCE OPTIMIZATION PATTERNS
-- ============================================================================
-- 16. Use sampling for exploration
SELECT *
FROM read_parquet('s3://your-bucket/large_dataset/*.parquet')
USING SAMPLE 10000 ROWS; -- Sample 10,000 random rows
-- 17. Create temporary views for complex queries
CREATE TEMP VIEW daily_metrics AS
SELECT
DATE(timestamp) as date,
COUNT(*) as events,
COUNT(DISTINCT user_id) as unique_users
FROM read_parquet('s3://your-bucket/events/*.parquet')
GROUP BY DATE(timestamp);
-- Now use the view multiple times efficiently
SELECT * FROM daily_metrics WHERE date = CURRENT_DATE - 1;
SELECT AVG(events) as avg_daily_events FROM daily_metrics;
-- 18. Use EXPLAIN to understand query performance
EXPLAIN ANALYZE
SELECT COUNT(*)
FROM read_parquet('s3://your-bucket/data/*.parquet')
WHERE date_column >= '2025-01-01';
-- ============================================================================
-- WORKING WITH SEMI-STRUCTURED DATA
-- ============================================================================
-- 19. Parse JSON columns
SELECT
user_id,
json_extract_string(metadata, '$.browser') as browser,
json_extract_string(metadata, '$.os') as operating_system,
CAST(json_extract(metadata, '$.screen_width') AS INTEGER) as screen_width
FROM read_parquet('s3://your-bucket/events/web_events.parquet')
WHERE json_valid(metadata);
-- 20. Unnest arrays
SELECT
order_id,
customer_id,
unnest(product_ids) as product_id,
unnest(quantities) as quantity
FROM read_parquet('s3://your-bucket/orders/orders_nested.parquet');
-- ============================================================================
-- DATA QUALITY CHECKS
-- ============================================================================
-- 21. Data profiling
SELECT
'total_rows' as metric, COUNT(*) as value
FROM read_parquet('s3://your-bucket/data/*.parquet')
UNION ALL
SELECT
'null_customer_ids', COUNT(*)
FROM read_parquet('s3://your-bucket/data/*.parquet')
WHERE customer_id IS NULL
UNION ALL
SELECT
'duplicate_order_ids', COUNT(*) - COUNT(DISTINCT order_id)
FROM read_parquet('s3://your-bucket/data/*.parquet')
UNION ALL
SELECT
'future_dates', COUNT(*)
FROM read_parquet('s3://your-bucket/data/*.parquet')
WHERE order_date > CURRENT_DATE;
-- 22. Schema validation
SELECT
column_name,
data_type,
is_nullable
FROM (
SELECT * FROM read_parquet('s3://your-bucket/data/sample.parquet') LIMIT 1
)
UNPIVOT INCLUDE NULLS (
value FOR column_name IN (*)
);
-- ============================================================================
-- NOTES ON USAGE
-- ============================================================================
/*
Best Practices for DuckDB on Lambda:
1. File Organization:
- Use larger Parquet files (100MB-1GB) rather than many small files
- Partition by date for time-based queries
- Use ZSTD compression for better compression ratio
2. Query Optimization:
- Push filters down to the scan level
- Use column pruning (SELECT only needed columns)
- Leverage DuckDB's automatic query optimization
3. Memory Management:
- Start with 2048MB Lambda memory
- Monitor actual usage and adjust
- DuckDB will spill to disk if needed
4. Caching:
- Enable HTTP and object caching
- Reuse connections across invocations
- Cache frequently accessed data in temp views
5. Error Handling:
- Always validate user input
- Use TRY expressions for safe casting
- Implement query timeouts
*/
/**
* DuckDB Lambda Query Handler
* Complete implementation example for serverless analytics
*/
import { APIGatewayEvent, Context } from 'aws-lambda';
import DuckDB from 'duckdb';
import { metricScope, Unit } from 'aws-embedded-metrics';
// Patch BigInt for JSON serialization
(BigInt.prototype as any).toJSON = function() {
return this.toString();
};
// Initialize DuckDB with in-memory database
const duckDB = new DuckDB.Database(':memory:', {
allow_unsigned_extensions: 'true'
});
// Create a reusable connection
const connection = duckDB.connect();
// Track initialization state
let isInitialized = false;
// Query filter for security - based on serverless-duckdb wiki documentation
const filterQuery = (query: string | undefined, isRemoteQuery: boolean = true): string => {
// Allow all operations for initialization queries (isRemoteQuery: false)
if (!isRemoteQuery) return query || '';
// Apply strict filtering for remote (user-submitted) queries
if (query && isRemoteQuery) {
const lowerQuery = query.toLowerCase();
// Block access to DuckDB internal settings
if (lowerQuery.indexOf('duckdb_settings') > -1) {
return `SELECT 'Function is disabled' as error`;
}
// Prevent extension installation
if (query.trim().toLowerCase().startsWith('install')) {
return `SELECT 'Extension installation disabled' as error`;
}
// Block extension loading
if (query.trim().toLowerCase().startsWith('load')) {
return `SELECT 'Extension loading is disabled' as error`;
}
// Prevent configuration changes
if (lowerQuery.indexOf('set') > -1) {
return `SELECT 'Using SET is disabled' as error`;
}
// Block PRAGMA statements
if (lowerQuery.indexOf('pragma') > -1) {
return `SELECT 'Using PRAGMA is disabled' as error`;
}
}
return query || '';
};
// Promisified query execution
const executeQuery = (sql: string, isRemoteQuery: boolean = true): Promise<any> => {
return new Promise((resolve, reject) => {
const filteredQuery = filterQuery(sql, isRemoteQuery);
connection.all(filteredQuery, (err, result) => {
if (err) reject(err);
else resolve(result);
});
});
};
// Initialize DuckDB with optimal settings for Lambda - based on wiki documentation
const initializeDuckDB = async (): Promise<number> => {
const startTime = Date.now();
try {
// Set temporary directory for Lambda environment
await executeQuery(`SET home_directory='/tmp';`, false);
// Install and load essential extensions
await executeQuery(`INSTALL httpfs;`, false);
await executeQuery(`LOAD httpfs;`, false);
// Enable Lambda-optimized extension repository
await executeQuery(
`SET custom_extension_repository = 'https://extensions.quacking.cloud';`,
false
);
// Performance optimizations from wiki
// Enable HTTP metadata caching - caches HTTP headers to avoid repeated requests
await executeQuery(`SET enable_http_metadata_cache=true;`, false);
// Enable object caching - stores Parquet metadata for 4x speedup
await executeQuery(`SET enable_object_cache=true;`, false);
// Security configurations from wiki
// Disable local filesystem access to protect Lambda environment
await executeQuery(`SET disabled_filesystems = 'LocalFileSystem';`, false);
// Lock configuration to prevent runtime changes
await executeQuery(`SET lock_configuration = true;`, false);
// Optional: Configure AWS credentials if needed
// Uncomment if not using IAM role credentials
/*
await executeQuery(`SET s3_region='${process.env.AWS_REGION}';`, false);
await executeQuery(`SET s3_access_key_id='${process.env.AWS_ACCESS_KEY_ID}';`, false);
await executeQuery(`SET s3_secret_access_key='${process.env.AWS_SECRET_ACCESS_KEY}';`, false);
await executeQuery(`SET s3_session_token='${process.env.AWS_SESSION_TOKEN}';`, false);
*/
isInitialized = true;
return Date.now() - startTime;
} catch (error) {
console.error('DuckDB initialization failed:', error);
throw error;
}
};
// Main Lambda handler with metrics
export const handler = metricScope(metrics =>
async (event: APIGatewayEvent, context: Context) => {
// Set up metrics dimensions
metrics.putDimensions({ Service: 'DuckDBQueryService' });
metrics.setProperty('RequestId', context.awsRequestId);
try {
// Validate request
if (!event.body) {
return {
statusCode: 400,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ error: 'Request body is required' })
};
}
// Parse request body
const { query, options = {} } = JSON.parse(event.body);
if (!query) {
return {
statusCode: 400,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ error: 'Query parameter is required' })
};
}
// Initialize DuckDB on cold start
if (!isInitialized) {
const initDuration = await initializeDuckDB();
metrics.putMetric('InitializationDuration', initDuration, Unit.Milliseconds);
console.log(`DuckDB initialized in ${initDuration}ms`);
}
// Execute query with timing
const queryStartTime = Date.now();
const result = await executeQuery(query);
const queryDuration = Date.now() - queryStartTime;
// Record metrics
metrics.putMetric('QueryDuration', queryDuration, Unit.Milliseconds);
metrics.putMetric('ResultRows', Array.isArray(result) ? result.length : 0, Unit.Count);
// Log query details for debugging
console.log({
requestId: context.awsRequestId,
queryDuration,
resultRows: Array.isArray(result) ? result.length : 0,
memoryUsed: process.memoryUsage().heapUsed / 1024 / 1024,
memoryLimit: context.memoryLimitInMB
});
// Return successful response
return {
statusCode: 200,
headers: {
'Content-Type': 'application/json',
'X-Query-Duration-Ms': queryDuration.toString(),
'X-Request-Id': context.awsRequestId
},
body: JSON.stringify({
success: true,
data: result,
metadata: {
queryDuration,
rowCount: Array.isArray(result) ? result.length : 0,
requestId: context.awsRequestId
}
})
};
} catch (error: any) {
// Log error details
console.error('Query execution failed:', {
requestId: context.awsRequestId,
error: error.message,
stack: error.stack
});
// Record error metric
metrics.putMetric('QueryErrors', 1, Unit.Count);
// Return error response
return {
statusCode: 400,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
success: false,
error: error.message || 'Query execution failed',
requestId: context.awsRequestId
})
};
}
}
);
// Handle Lambda runtime shutdown gracefully
process.on('SIGTERM', async () => {
console.log('SIGTERM received, closing DuckDB connection');
try {
connection.close((err) => {
if (err) console.error('Error closing connection:', err);
});
duckDB.close((err) => {
if (err) console.error('Error closing database:', err);
});
} catch (error) {
console.error('Error during shutdown:', error);
}
process.exit(0);
});
/**
* Example queries that work with this handler:
*
* 1. Query S3 Parquet files:
* SELECT * FROM read_parquet('s3://bucket/file.parquet') LIMIT 10
*
* 2. Aggregate data:
* SELECT COUNT(*), AVG(amount) FROM read_parquet('s3://bucket/data/*.parquet')
*
* 3. Join multiple sources:
* SELECT * FROM read_parquet('s3://bucket/users.parquet') u
* JOIN read_csv_auto('https://example.com/orders.csv') o ON u.id = o.user_id
*
* 4. Create temporary views:
* CREATE TEMP VIEW sales AS SELECT * FROM read_parquet('s3://bucket/sales.parquet');
* SELECT date_trunc('month', date) as month, SUM(amount) FROM sales GROUP BY month
*/
# Production-Ready Serverless Configuration for DuckDB Lambda
service: duckdb-analytics
frameworkVersion: '3'
# Plugins for enhanced functionality
plugins:
- serverless-iam-roles-per-function # Fine-grained IAM permissions
- serverless-prune-plugin # Automatic version cleanup
- serverless-esbuild # TypeScript compilation and bundling
custom:
# API versioning
api:
version: 'v1'
# DuckDB Layer versions (check for latest at https://github.com/tobilg/duckdb-nodejs-layer)
duckdbLayers:
x86: 'arn:aws:lambda:${self:provider.region}:041475135427:layer:duckdb-nodejs-x86:18'
arm64: 'arn:aws:lambda:${self:provider.region}:041475135427:layer:duckdb-nodejs-arm64:16'
extensions: 'arn:aws:lambda:${self:provider.region}:041475135427:layer:duckdb-extensions-nodejs-x86:6'
# S3 bucket configuration for data access
dataBucket: ${env:DATA_BUCKET, 'my-analytics-data'}
# S3 Express One Zone configuration (optional, uncomment to enable)
# s3Express:
# availabilityZoneId: 'use1-az4' # Choose based on your region
# bucketName: 'duckdb-express--${self:custom.s3Express.availabilityZoneId}--x-s3'
# esbuild configuration for optimal bundling
esbuild:
bundle: true
minify: false # Set to true for production
sourcemap: true
exclude:
- 'duckdb' # Provided by Lambda layer
- 'aws-lambda' # AWS Lambda types
- 'dtrace-provider' # Optional dependency
target: 'node20'
platform: 'node'
format: 'cjs'
# Prune old Lambda versions automatically
prune:
automatic: true
number: 3 # Keep last 3 versions
provider:
name: aws
runtime: nodejs20.x
architecture: x86_64 # or arm64 for Graviton2
region: ${opt:region, 'us-east-1'}
stage: ${opt:stage, 'dev'}
# CloudWatch Logs retention
logRetentionInDays: 7 # Adjust based on requirements
# API Gateway configuration
apiGateway:
# API Keys for authentication
apiKeys:
- name: DuckDBQueryKey
description: 'API key for DuckDB query endpoint'
# Usage plan for rate limiting
usagePlan:
quota:
limit: 10000
period: DAY
throttle:
burstLimit: 100
rateLimit: 50
# Global environment variables
environment:
# AWS SDK optimizations
AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1'
NODE_OPTIONS: '--enable-source-maps'
# Application settings
STAGE: '${self:provider.stage}'
LOG_LEVEL: '${env:LOG_LEVEL, "info"}'
DATA_BUCKET: '${self:custom.dataBucket}'
# DuckDB settings
DUCKDB_CACHE_ENABLED: 'true'
DUCKDB_MEMORY_LIMIT: '1GB' # Adjust based on Lambda memory
# S3 Express endpoint (uncomment if using S3 Express)
# AWS_S3_EXPRESS_ENDPOINT: 's3express-${self:custom.s3Express.availabilityZoneId}.${self:provider.region}.amazonaws.com'
# Default Lambda configuration
timeout: 30 # Maximum for API Gateway
memorySize: 2048 # Adjust based on workload
# X-Ray tracing for performance monitoring
tracing:
lambda: true
apiGateway: true
functions:
# Main query endpoint with API Gateway
query:
handler: src/functions/query.handler
description: 'Execute DuckDB queries via REST API'
memorySize: 2048 # Override default if needed
timeout: 30
reservedConcurrency: 10 # Prevent cold starts
# Function-specific environment variables
environment:
QUERY_TIMEOUT: '25000' # 25 seconds, leaving buffer for Lambda timeout
# Lambda layers
layers:
- ${self:custom.duckdbLayers.x86}
# IAM permissions for this function
iamRoleStatements:
# S3 read permissions for data bucket
- Effect: Allow
Action:
- s3:GetObject
- s3:GetObjectVersion
Resource: 'arn:aws:s3:::${self:custom.dataBucket}/*'
- Effect: Allow
Action:
- s3:ListBucket
- s3:GetBucketLocation
Resource: 'arn:aws:s3:::${self:custom.dataBucket}'
# CloudWatch metrics
- Effect: Allow
Action:
- cloudwatch:PutMetricData
Resource: '*'
# API Gateway event
events:
- http:
path: ${self:custom.api.version}/query
method: post
cors:
origin: '*' # Configure for your domain in production
headers:
- Content-Type
- X-Api-Key
- X-Request-Id
private: true # Requires API key
request:
schemas:
application/json: ${file(schemas/query-request.json)}
# Streaming query endpoint with Function URL
streamingQuery:
handler: src/functions/streamingQuery.handler
description: 'Stream large query results via Apache Arrow'
memorySize: 3008 # Higher memory for streaming
timeout: 900 # 15 minutes max
# Function URL configuration
url:
invokeMode: RESPONSE_STREAM
cors:
allowOrigins:
- '*' # Configure for production
allowHeaders:
- Content-Type
- X-Request-Id
maxAge: 86400
layers:
- ${self:custom.duckdbLayers.x86}
- ${self:custom.duckdbLayers.extensions} # Additional extensions
iamRoleStatements:
# Same S3 permissions
- Effect: Allow
Action:
- s3:GetObject
- s3:GetObjectVersion
Resource: 'arn:aws:s3:::${self:custom.dataBucket}/*'
- Effect: Allow
Action:
- s3:ListBucket
Resource: 'arn:aws:s3:::${self:custom.dataBucket}'
# Batch processing function (invoked by EventBridge or S3 events)
batchProcessor:
handler: src/functions/batchProcessor.handler
description: 'Process data files on S3 upload'
memorySize: 10240 # Maximum memory for large batches
timeout: 900
layers:
- ${self:custom.duckdbLayers.x86}
- ${self:custom.duckdbLayers.extensions}
iamRoleStatements:
# Read from source bucket
- Effect: Allow
Action:
- s3:GetObject
Resource: 'arn:aws:s3:::${self:custom.dataBucket}/raw/*'
# Write to processed bucket
- Effect: Allow
Action:
- s3:PutObject
- s3:PutObjectAcl
Resource: 'arn:aws:s3:::${self:custom.dataBucket}/processed/*'
- Effect: Allow
Action:
- s3:ListBucket
Resource: 'arn:aws:s3:::${self:custom.dataBucket}'
events:
# Trigger on S3 uploads
- s3:
bucket: ${self:custom.dataBucket}
event: s3:ObjectCreated:*
rules:
- prefix: raw/
- suffix: .csv
# Scheduled processing
- schedule:
rate: rate(1 hour)
enabled: false # Enable when ready
input:
action: 'hourly-aggregation'
# S3 Express One Zone function (uncomment to enable)
# queryS3Express:
# handler: src/functions/queryS3Express.handler
# description: 'Ultra-low latency queries with S3 Express'
# memorySize: 10240 # Maximum memory
# timeout: 30
#
# layers:
# - ${self:custom.duckdbLayers.extensions}
#
# vpc:
# securityGroupIds:
# - !GetAtt VpcEndpointLambdaSecurityGroup.GroupId
# subnetIds:
# - !GetAtt PrivateSubnet.SubnetId
#
# iamRoleStatements:
# - Effect: Allow
# Action:
# - s3express:CreateSession
# Resource:
# - !Sub 'arn:aws:s3express:${AWS::Region}:${AWS::AccountId}:bucket/${self:custom.s3Express.bucketName}'
#
# events:
# - http:
# path: ${self:custom.api.version}/query-express
# method: post
# cors: true
# private: true
# CloudFormation resources
resources:
Resources:
# S3 bucket for query results (optional)
QueryResultsBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: ${self:service}-query-results-${self:provider.stage}
LifecycleConfiguration:
Rules:
- Id: DeleteOldResults
Status: Enabled
ExpirationInDays: 7 # Auto-delete after 7 days
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
# CloudWatch dashboard for monitoring
DuckDBDashboard:
Type: AWS::CloudWatch::Dashboard
Properties:
DashboardName: ${self:service}-${self:provider.stage}
DashboardBody: !Sub |
{
"widgets": [
{
"type": "metric",
"properties": {
"metrics": [
["AWS/Lambda", "Invocations", {"stat": "Sum"}],
[".", "Errors", {"stat": "Sum"}],
[".", "Duration", {"stat": "Average"}],
[".", "ConcurrentExecutions", {"stat": "Maximum"}]
],
"period": 300,
"stat": "Average",
"region": "${AWS::Region}",
"title": "Lambda Metrics"
}
}
]
}
# Alarms for monitoring
QueryErrorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: ${self:service}-${self:provider.stage}-query-errors
AlarmDescription: 'Alert when query function has errors'
MetricName: Errors
Namespace: AWS/Lambda
Dimensions:
- Name: FunctionName
Value: ${self:service}-${self:provider.stage}-query
Statistic: Sum
Period: 300
EvaluationPeriods: 1
Threshold: 5
ComparisonOperator: GreaterThanThreshold
TreatMissingData: notBreaching
# Outputs for reference
Outputs:
QueryApiUrl:
Description: 'API Gateway URL for queries'
Value: !Sub 'https://${ApiGatewayRestApi}.execute-api.${AWS::Region}.amazonaws.com/${self:provider.stage}'
Export:
Name: ${self:service}-${self:provider.stage}-api-url
QueryResultsBucket:
Description: 'S3 bucket for query results'
Value: !Ref QueryResultsBucket
Export:
Name: ${self:service}-${self:provider.stage}-results-bucket
DashboardUrl:
Description: 'CloudWatch Dashboard URL'
Value: !Sub 'https://console.aws.amazon.com/cloudwatch/home?region=${AWS::Region}#dashboards:name=${self:service}-${self:provider.stage}'
# Package individually for optimized deployments
package:
individually: true
patterns:
- '!.git/**'
- '!.gitignore'
- '!.DS_Store'
- '!npm-debug.log'
- '!.serverless/**'
- '!.serverless_plugins/**'
- '!tests/**'
- '!.vscode/**'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment