Created
August 25, 2025 19:02
-
-
Save jiangzhuo/84fb76f2430aff1731ac9ca4100aa93c to your computer and use it in GitHub Desktop.
duckdb-lambda-2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- DuckDB on Lambda: Common Query Patterns and Use Cases | |
-- These queries demonstrate the power and flexibility of DuckDB for serverless analytics | |
-- ============================================================================ | |
-- BASIC DATA INGESTION | |
-- ============================================================================ | |
-- 1. Read Parquet files from S3 | |
SELECT * | |
FROM read_parquet('s3://your-bucket/data/sales_2025.parquet') | |
LIMIT 10; | |
-- 2. Read multiple Parquet files with glob pattern | |
SELECT COUNT(*) as total_records, | |
MIN(date) as earliest_date, | |
MAX(date) as latest_date | |
FROM read_parquet('s3://your-bucket/data/sales_*.parquet'); | |
-- 3. Read CSV with automatic schema detection | |
SELECT * | |
FROM read_csv_auto('s3://your-bucket/raw/customers.csv', header=true) | |
LIMIT 5; | |
-- 4. Read JSON data | |
SELECT * | |
FROM read_json_auto('s3://your-bucket/events/user_events.json'); | |
-- 5. Query data directly from HTTP endpoints | |
SELECT * | |
FROM read_parquet('https://datasets.example.com/public/dataset.parquet') | |
WHERE year = 2025; | |
-- ============================================================================ | |
-- ANALYTICAL QUERIES | |
-- ============================================================================ | |
-- 6. Daily Active Users (DAU) calculation | |
SELECT | |
DATE(event_timestamp) as day, | |
COUNT(DISTINCT user_id) as daily_active_users, | |
COUNT(*) as total_events | |
FROM read_parquet('s3://your-bucket/events/app_events_*.parquet') | |
WHERE event_timestamp >= CURRENT_DATE - INTERVAL 7 DAY | |
GROUP BY DATE(event_timestamp) | |
ORDER BY day DESC; | |
-- 7. Revenue analysis with window functions | |
SELECT | |
product_category, | |
sale_date, | |
daily_revenue, | |
SUM(daily_revenue) OVER ( | |
PARTITION BY product_category | |
ORDER BY sale_date | |
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW | |
) as rolling_7day_revenue, | |
AVG(daily_revenue) OVER ( | |
PARTITION BY product_category | |
ORDER BY sale_date | |
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW | |
) as rolling_30day_avg | |
FROM ( | |
SELECT | |
product_category, | |
DATE(order_timestamp) as sale_date, | |
SUM(amount) as daily_revenue | |
FROM read_parquet('s3://your-bucket/sales/*.parquet') | |
WHERE order_timestamp >= CURRENT_DATE - INTERVAL 90 DAY | |
GROUP BY product_category, DATE(order_timestamp) | |
); | |
-- 8. Customer cohort analysis | |
WITH cohorts AS ( | |
SELECT | |
user_id, | |
DATE_TRUNC('month', MIN(order_date)) as cohort_month | |
FROM read_parquet('s3://your-bucket/orders/*.parquet') | |
GROUP BY user_id | |
) | |
SELECT | |
cohort_month, | |
DATE_TRUNC('month', o.order_date) as order_month, | |
COUNT(DISTINCT c.user_id) as active_users, | |
SUM(o.amount) as total_revenue, | |
AVG(o.amount) as avg_order_value | |
FROM cohorts c | |
JOIN read_parquet('s3://your-bucket/orders/*.parquet') o | |
ON c.user_id = o.user_id | |
GROUP BY cohort_month, DATE_TRUNC('month', o.order_date) | |
ORDER BY cohort_month, order_month; | |
-- 9. Funnel analysis | |
WITH funnel_events AS ( | |
SELECT | |
session_id, | |
MAX(CASE WHEN event_name = 'page_view' THEN 1 ELSE 0 END) as viewed, | |
MAX(CASE WHEN event_name = 'add_to_cart' THEN 1 ELSE 0 END) as added_to_cart, | |
MAX(CASE WHEN event_name = 'checkout' THEN 1 ELSE 0 END) as checked_out, | |
MAX(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) as purchased | |
FROM read_parquet('s3://your-bucket/events/web_events_*.parquet') | |
WHERE event_date = CURRENT_DATE - 1 | |
GROUP BY session_id | |
) | |
SELECT | |
SUM(viewed) as page_views, | |
SUM(added_to_cart) as cart_adds, | |
SUM(checked_out) as checkouts, | |
SUM(purchased) as purchases, | |
ROUND(100.0 * SUM(added_to_cart) / NULLIF(SUM(viewed), 0), 2) as view_to_cart_rate, | |
ROUND(100.0 * SUM(checked_out) / NULLIF(SUM(added_to_cart), 0), 2) as cart_to_checkout_rate, | |
ROUND(100.0 * SUM(purchased) / NULLIF(SUM(checked_out), 0), 2) as checkout_to_purchase_rate | |
FROM funnel_events; | |
-- ============================================================================ | |
-- ETL AND DATA TRANSFORMATION | |
-- ============================================================================ | |
-- 10. Transform and aggregate data for reporting | |
COPY ( | |
SELECT | |
DATE_TRUNC('hour', timestamp) as hour, | |
region, | |
COUNT(*) as request_count, | |
AVG(response_time_ms) as avg_response_time, | |
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY response_time_ms) as p50_response_time, | |
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time_ms) as p95_response_time, | |
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY response_time_ms) as p99_response_time | |
FROM read_parquet('s3://your-bucket/logs/api_logs_*.parquet') | |
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL 24 HOUR | |
GROUP BY DATE_TRUNC('hour', timestamp), region | |
) TO 's3://your-bucket/reports/hourly_api_metrics.parquet' (FORMAT PARQUET, COMPRESSION ZSTD); | |
-- 11. Data deduplication | |
COPY ( | |
SELECT DISTINCT ON (user_id, event_id) * | |
FROM read_parquet('s3://your-bucket/raw/events_*.parquet') | |
ORDER BY user_id, event_id, ingested_at DESC | |
) TO 's3://your-bucket/cleaned/deduplicated_events.parquet' (FORMAT PARQUET); | |
-- 12. Join and enrich data from multiple sources | |
COPY ( | |
SELECT | |
o.*, | |
c.customer_name, | |
c.customer_segment, | |
c.lifetime_value, | |
p.product_name, | |
p.product_category, | |
p.unit_cost | |
FROM read_parquet('s3://your-bucket/orders/*.parquet') o | |
LEFT JOIN read_parquet('s3://your-bucket/customers/current.parquet') c | |
ON o.customer_id = c.customer_id | |
LEFT JOIN read_csv_auto('https://api.example.com/products.csv') p | |
ON o.product_id = p.product_id | |
WHERE o.order_date >= CURRENT_DATE - INTERVAL 30 DAY | |
) TO 's3://your-bucket/enriched/orders_enriched.parquet' (FORMAT PARQUET); | |
-- ============================================================================ | |
-- ADVANCED ANALYTICS | |
-- ============================================================================ | |
-- 13. Time series decomposition with moving averages | |
WITH time_series AS ( | |
SELECT | |
DATE(timestamp) as date, | |
SUM(value) as daily_value | |
FROM read_parquet('s3://your-bucket/metrics/*.parquet') | |
WHERE timestamp >= CURRENT_DATE - INTERVAL 365 DAY | |
GROUP BY DATE(timestamp) | |
), | |
moving_averages AS ( | |
SELECT | |
date, | |
daily_value, | |
AVG(daily_value) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as ma7, | |
AVG(daily_value) OVER (ORDER BY date ROWS BETWEEN 29 PRECEDING AND CURRENT ROW) as ma30 | |
FROM time_series | |
) | |
SELECT | |
date, | |
daily_value, | |
ma7, | |
ma30, | |
daily_value - ma7 as deviation_from_ma7, | |
CASE | |
WHEN daily_value > ma30 * 1.2 THEN 'spike' | |
WHEN daily_value < ma30 * 0.8 THEN 'drop' | |
ELSE 'normal' | |
END as anomaly_flag | |
FROM moving_averages | |
ORDER BY date DESC; | |
-- 14. User retention analysis | |
WITH user_activity AS ( | |
SELECT | |
user_id, | |
DATE(first_seen) as signup_date, | |
DATE(last_seen) as last_active_date | |
FROM ( | |
SELECT | |
user_id, | |
MIN(event_timestamp) as first_seen, | |
MAX(event_timestamp) as last_seen | |
FROM read_parquet('s3://your-bucket/events/user_events_*.parquet') | |
GROUP BY user_id | |
) | |
), | |
retention_cohorts AS ( | |
SELECT | |
DATE_TRUNC('week', signup_date) as cohort_week, | |
COUNT(DISTINCT user_id) as cohort_size, | |
COUNT(DISTINCT CASE | |
WHEN last_active_date >= signup_date + INTERVAL 7 DAY THEN user_id | |
END) as week_1_retained, | |
COUNT(DISTINCT CASE | |
WHEN last_active_date >= signup_date + INTERVAL 30 DAY THEN user_id | |
END) as day_30_retained, | |
COUNT(DISTINCT CASE | |
WHEN last_active_date >= signup_date + INTERVAL 90 DAY THEN user_id | |
END) as day_90_retained | |
FROM user_activity | |
WHERE signup_date >= CURRENT_DATE - INTERVAL 180 DAY | |
GROUP BY DATE_TRUNC('week', signup_date) | |
) | |
SELECT | |
cohort_week, | |
cohort_size, | |
ROUND(100.0 * week_1_retained / cohort_size, 2) as week_1_retention_rate, | |
ROUND(100.0 * day_30_retained / cohort_size, 2) as day_30_retention_rate, | |
ROUND(100.0 * day_90_retained / cohort_size, 2) as day_90_retention_rate | |
FROM retention_cohorts | |
ORDER BY cohort_week DESC; | |
-- 15. Pivot table for cross-tabulation analysis | |
PIVOT ( | |
SELECT | |
product_category, | |
DATE_TRUNC('month', order_date) as month, | |
SUM(revenue) as total_revenue | |
FROM read_parquet('s3://your-bucket/sales/*.parquet') | |
WHERE order_date >= CURRENT_DATE - INTERVAL 12 MONTH | |
GROUP BY product_category, DATE_TRUNC('month', order_date) | |
) ON month USING SUM(total_revenue) | |
GROUP BY product_category | |
ORDER BY product_category; | |
-- ============================================================================ | |
-- PERFORMANCE OPTIMIZATION PATTERNS | |
-- ============================================================================ | |
-- 16. Use sampling for exploration | |
SELECT * | |
FROM read_parquet('s3://your-bucket/large_dataset/*.parquet') | |
USING SAMPLE 10000 ROWS; -- Sample 10,000 random rows | |
-- 17. Create temporary views for complex queries | |
CREATE TEMP VIEW daily_metrics AS | |
SELECT | |
DATE(timestamp) as date, | |
COUNT(*) as events, | |
COUNT(DISTINCT user_id) as unique_users | |
FROM read_parquet('s3://your-bucket/events/*.parquet') | |
GROUP BY DATE(timestamp); | |
-- Now use the view multiple times efficiently | |
SELECT * FROM daily_metrics WHERE date = CURRENT_DATE - 1; | |
SELECT AVG(events) as avg_daily_events FROM daily_metrics; | |
-- 18. Use EXPLAIN to understand query performance | |
EXPLAIN ANALYZE | |
SELECT COUNT(*) | |
FROM read_parquet('s3://your-bucket/data/*.parquet') | |
WHERE date_column >= '2025-01-01'; | |
-- ============================================================================ | |
-- WORKING WITH SEMI-STRUCTURED DATA | |
-- ============================================================================ | |
-- 19. Parse JSON columns | |
SELECT | |
user_id, | |
json_extract_string(metadata, '$.browser') as browser, | |
json_extract_string(metadata, '$.os') as operating_system, | |
CAST(json_extract(metadata, '$.screen_width') AS INTEGER) as screen_width | |
FROM read_parquet('s3://your-bucket/events/web_events.parquet') | |
WHERE json_valid(metadata); | |
-- 20. Unnest arrays | |
SELECT | |
order_id, | |
customer_id, | |
unnest(product_ids) as product_id, | |
unnest(quantities) as quantity | |
FROM read_parquet('s3://your-bucket/orders/orders_nested.parquet'); | |
-- ============================================================================ | |
-- DATA QUALITY CHECKS | |
-- ============================================================================ | |
-- 21. Data profiling | |
SELECT | |
'total_rows' as metric, COUNT(*) as value | |
FROM read_parquet('s3://your-bucket/data/*.parquet') | |
UNION ALL | |
SELECT | |
'null_customer_ids', COUNT(*) | |
FROM read_parquet('s3://your-bucket/data/*.parquet') | |
WHERE customer_id IS NULL | |
UNION ALL | |
SELECT | |
'duplicate_order_ids', COUNT(*) - COUNT(DISTINCT order_id) | |
FROM read_parquet('s3://your-bucket/data/*.parquet') | |
UNION ALL | |
SELECT | |
'future_dates', COUNT(*) | |
FROM read_parquet('s3://your-bucket/data/*.parquet') | |
WHERE order_date > CURRENT_DATE; | |
-- 22. Schema validation | |
SELECT | |
column_name, | |
data_type, | |
is_nullable | |
FROM ( | |
SELECT * FROM read_parquet('s3://your-bucket/data/sample.parquet') LIMIT 1 | |
) | |
UNPIVOT INCLUDE NULLS ( | |
value FOR column_name IN (*) | |
); | |
-- ============================================================================ | |
-- NOTES ON USAGE | |
-- ============================================================================ | |
/* | |
Best Practices for DuckDB on Lambda: | |
1. File Organization: | |
- Use larger Parquet files (100MB-1GB) rather than many small files | |
- Partition by date for time-based queries | |
- Use ZSTD compression for better compression ratio | |
2. Query Optimization: | |
- Push filters down to the scan level | |
- Use column pruning (SELECT only needed columns) | |
- Leverage DuckDB's automatic query optimization | |
3. Memory Management: | |
- Start with 2048MB Lambda memory | |
- Monitor actual usage and adjust | |
- DuckDB will spill to disk if needed | |
4. Caching: | |
- Enable HTTP and object caching | |
- Reuse connections across invocations | |
- Cache frequently accessed data in temp views | |
5. Error Handling: | |
- Always validate user input | |
- Use TRY expressions for safe casting | |
- Implement query timeouts | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* DuckDB Lambda Query Handler | |
* Complete implementation example for serverless analytics | |
*/ | |
import { APIGatewayEvent, Context } from 'aws-lambda'; | |
import DuckDB from 'duckdb'; | |
import { metricScope, Unit } from 'aws-embedded-metrics'; | |
// Patch BigInt for JSON serialization | |
(BigInt.prototype as any).toJSON = function() { | |
return this.toString(); | |
}; | |
// Initialize DuckDB with in-memory database | |
const duckDB = new DuckDB.Database(':memory:', { | |
allow_unsigned_extensions: 'true' | |
}); | |
// Create a reusable connection | |
const connection = duckDB.connect(); | |
// Track initialization state | |
let isInitialized = false; | |
// Query filter for security - based on serverless-duckdb wiki documentation | |
const filterQuery = (query: string | undefined, isRemoteQuery: boolean = true): string => { | |
// Allow all operations for initialization queries (isRemoteQuery: false) | |
if (!isRemoteQuery) return query || ''; | |
// Apply strict filtering for remote (user-submitted) queries | |
if (query && isRemoteQuery) { | |
const lowerQuery = query.toLowerCase(); | |
// Block access to DuckDB internal settings | |
if (lowerQuery.indexOf('duckdb_settings') > -1) { | |
return `SELECT 'Function is disabled' as error`; | |
} | |
// Prevent extension installation | |
if (query.trim().toLowerCase().startsWith('install')) { | |
return `SELECT 'Extension installation disabled' as error`; | |
} | |
// Block extension loading | |
if (query.trim().toLowerCase().startsWith('load')) { | |
return `SELECT 'Extension loading is disabled' as error`; | |
} | |
// Prevent configuration changes | |
if (lowerQuery.indexOf('set') > -1) { | |
return `SELECT 'Using SET is disabled' as error`; | |
} | |
// Block PRAGMA statements | |
if (lowerQuery.indexOf('pragma') > -1) { | |
return `SELECT 'Using PRAGMA is disabled' as error`; | |
} | |
} | |
return query || ''; | |
}; | |
// Promisified query execution | |
const executeQuery = (sql: string, isRemoteQuery: boolean = true): Promise<any> => { | |
return new Promise((resolve, reject) => { | |
const filteredQuery = filterQuery(sql, isRemoteQuery); | |
connection.all(filteredQuery, (err, result) => { | |
if (err) reject(err); | |
else resolve(result); | |
}); | |
}); | |
}; | |
// Initialize DuckDB with optimal settings for Lambda - based on wiki documentation | |
const initializeDuckDB = async (): Promise<number> => { | |
const startTime = Date.now(); | |
try { | |
// Set temporary directory for Lambda environment | |
await executeQuery(`SET home_directory='/tmp';`, false); | |
// Install and load essential extensions | |
await executeQuery(`INSTALL httpfs;`, false); | |
await executeQuery(`LOAD httpfs;`, false); | |
// Enable Lambda-optimized extension repository | |
await executeQuery( | |
`SET custom_extension_repository = 'https://extensions.quacking.cloud';`, | |
false | |
); | |
// Performance optimizations from wiki | |
// Enable HTTP metadata caching - caches HTTP headers to avoid repeated requests | |
await executeQuery(`SET enable_http_metadata_cache=true;`, false); | |
// Enable object caching - stores Parquet metadata for 4x speedup | |
await executeQuery(`SET enable_object_cache=true;`, false); | |
// Security configurations from wiki | |
// Disable local filesystem access to protect Lambda environment | |
await executeQuery(`SET disabled_filesystems = 'LocalFileSystem';`, false); | |
// Lock configuration to prevent runtime changes | |
await executeQuery(`SET lock_configuration = true;`, false); | |
// Optional: Configure AWS credentials if needed | |
// Uncomment if not using IAM role credentials | |
/* | |
await executeQuery(`SET s3_region='${process.env.AWS_REGION}';`, false); | |
await executeQuery(`SET s3_access_key_id='${process.env.AWS_ACCESS_KEY_ID}';`, false); | |
await executeQuery(`SET s3_secret_access_key='${process.env.AWS_SECRET_ACCESS_KEY}';`, false); | |
await executeQuery(`SET s3_session_token='${process.env.AWS_SESSION_TOKEN}';`, false); | |
*/ | |
isInitialized = true; | |
return Date.now() - startTime; | |
} catch (error) { | |
console.error('DuckDB initialization failed:', error); | |
throw error; | |
} | |
}; | |
// Main Lambda handler with metrics | |
export const handler = metricScope(metrics => | |
async (event: APIGatewayEvent, context: Context) => { | |
// Set up metrics dimensions | |
metrics.putDimensions({ Service: 'DuckDBQueryService' }); | |
metrics.setProperty('RequestId', context.awsRequestId); | |
try { | |
// Validate request | |
if (!event.body) { | |
return { | |
statusCode: 400, | |
headers: { 'Content-Type': 'application/json' }, | |
body: JSON.stringify({ error: 'Request body is required' }) | |
}; | |
} | |
// Parse request body | |
const { query, options = {} } = JSON.parse(event.body); | |
if (!query) { | |
return { | |
statusCode: 400, | |
headers: { 'Content-Type': 'application/json' }, | |
body: JSON.stringify({ error: 'Query parameter is required' }) | |
}; | |
} | |
// Initialize DuckDB on cold start | |
if (!isInitialized) { | |
const initDuration = await initializeDuckDB(); | |
metrics.putMetric('InitializationDuration', initDuration, Unit.Milliseconds); | |
console.log(`DuckDB initialized in ${initDuration}ms`); | |
} | |
// Execute query with timing | |
const queryStartTime = Date.now(); | |
const result = await executeQuery(query); | |
const queryDuration = Date.now() - queryStartTime; | |
// Record metrics | |
metrics.putMetric('QueryDuration', queryDuration, Unit.Milliseconds); | |
metrics.putMetric('ResultRows', Array.isArray(result) ? result.length : 0, Unit.Count); | |
// Log query details for debugging | |
console.log({ | |
requestId: context.awsRequestId, | |
queryDuration, | |
resultRows: Array.isArray(result) ? result.length : 0, | |
memoryUsed: process.memoryUsage().heapUsed / 1024 / 1024, | |
memoryLimit: context.memoryLimitInMB | |
}); | |
// Return successful response | |
return { | |
statusCode: 200, | |
headers: { | |
'Content-Type': 'application/json', | |
'X-Query-Duration-Ms': queryDuration.toString(), | |
'X-Request-Id': context.awsRequestId | |
}, | |
body: JSON.stringify({ | |
success: true, | |
data: result, | |
metadata: { | |
queryDuration, | |
rowCount: Array.isArray(result) ? result.length : 0, | |
requestId: context.awsRequestId | |
} | |
}) | |
}; | |
} catch (error: any) { | |
// Log error details | |
console.error('Query execution failed:', { | |
requestId: context.awsRequestId, | |
error: error.message, | |
stack: error.stack | |
}); | |
// Record error metric | |
metrics.putMetric('QueryErrors', 1, Unit.Count); | |
// Return error response | |
return { | |
statusCode: 400, | |
headers: { 'Content-Type': 'application/json' }, | |
body: JSON.stringify({ | |
success: false, | |
error: error.message || 'Query execution failed', | |
requestId: context.awsRequestId | |
}) | |
}; | |
} | |
} | |
); | |
// Handle Lambda runtime shutdown gracefully | |
process.on('SIGTERM', async () => { | |
console.log('SIGTERM received, closing DuckDB connection'); | |
try { | |
connection.close((err) => { | |
if (err) console.error('Error closing connection:', err); | |
}); | |
duckDB.close((err) => { | |
if (err) console.error('Error closing database:', err); | |
}); | |
} catch (error) { | |
console.error('Error during shutdown:', error); | |
} | |
process.exit(0); | |
}); | |
/** | |
* Example queries that work with this handler: | |
* | |
* 1. Query S3 Parquet files: | |
* SELECT * FROM read_parquet('s3://bucket/file.parquet') LIMIT 10 | |
* | |
* 2. Aggregate data: | |
* SELECT COUNT(*), AVG(amount) FROM read_parquet('s3://bucket/data/*.parquet') | |
* | |
* 3. Join multiple sources: | |
* SELECT * FROM read_parquet('s3://bucket/users.parquet') u | |
* JOIN read_csv_auto('https://example.com/orders.csv') o ON u.id = o.user_id | |
* | |
* 4. Create temporary views: | |
* CREATE TEMP VIEW sales AS SELECT * FROM read_parquet('s3://bucket/sales.parquet'); | |
* SELECT date_trunc('month', date) as month, SUM(amount) FROM sales GROUP BY month | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Production-Ready Serverless Configuration for DuckDB Lambda | |
service: duckdb-analytics | |
frameworkVersion: '3' | |
# Plugins for enhanced functionality | |
plugins: | |
- serverless-iam-roles-per-function # Fine-grained IAM permissions | |
- serverless-prune-plugin # Automatic version cleanup | |
- serverless-esbuild # TypeScript compilation and bundling | |
custom: | |
# API versioning | |
api: | |
version: 'v1' | |
# DuckDB Layer versions (check for latest at https://github.com/tobilg/duckdb-nodejs-layer) | |
duckdbLayers: | |
x86: 'arn:aws:lambda:${self:provider.region}:041475135427:layer:duckdb-nodejs-x86:18' | |
arm64: 'arn:aws:lambda:${self:provider.region}:041475135427:layer:duckdb-nodejs-arm64:16' | |
extensions: 'arn:aws:lambda:${self:provider.region}:041475135427:layer:duckdb-extensions-nodejs-x86:6' | |
# S3 bucket configuration for data access | |
dataBucket: ${env:DATA_BUCKET, 'my-analytics-data'} | |
# S3 Express One Zone configuration (optional, uncomment to enable) | |
# s3Express: | |
# availabilityZoneId: 'use1-az4' # Choose based on your region | |
# bucketName: 'duckdb-express--${self:custom.s3Express.availabilityZoneId}--x-s3' | |
# esbuild configuration for optimal bundling | |
esbuild: | |
bundle: true | |
minify: false # Set to true for production | |
sourcemap: true | |
exclude: | |
- 'duckdb' # Provided by Lambda layer | |
- 'aws-lambda' # AWS Lambda types | |
- 'dtrace-provider' # Optional dependency | |
target: 'node20' | |
platform: 'node' | |
format: 'cjs' | |
# Prune old Lambda versions automatically | |
prune: | |
automatic: true | |
number: 3 # Keep last 3 versions | |
provider: | |
name: aws | |
runtime: nodejs20.x | |
architecture: x86_64 # or arm64 for Graviton2 | |
region: ${opt:region, 'us-east-1'} | |
stage: ${opt:stage, 'dev'} | |
# CloudWatch Logs retention | |
logRetentionInDays: 7 # Adjust based on requirements | |
# API Gateway configuration | |
apiGateway: | |
# API Keys for authentication | |
apiKeys: | |
- name: DuckDBQueryKey | |
description: 'API key for DuckDB query endpoint' | |
# Usage plan for rate limiting | |
usagePlan: | |
quota: | |
limit: 10000 | |
period: DAY | |
throttle: | |
burstLimit: 100 | |
rateLimit: 50 | |
# Global environment variables | |
environment: | |
# AWS SDK optimizations | |
AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1' | |
NODE_OPTIONS: '--enable-source-maps' | |
# Application settings | |
STAGE: '${self:provider.stage}' | |
LOG_LEVEL: '${env:LOG_LEVEL, "info"}' | |
DATA_BUCKET: '${self:custom.dataBucket}' | |
# DuckDB settings | |
DUCKDB_CACHE_ENABLED: 'true' | |
DUCKDB_MEMORY_LIMIT: '1GB' # Adjust based on Lambda memory | |
# S3 Express endpoint (uncomment if using S3 Express) | |
# AWS_S3_EXPRESS_ENDPOINT: 's3express-${self:custom.s3Express.availabilityZoneId}.${self:provider.region}.amazonaws.com' | |
# Default Lambda configuration | |
timeout: 30 # Maximum for API Gateway | |
memorySize: 2048 # Adjust based on workload | |
# X-Ray tracing for performance monitoring | |
tracing: | |
lambda: true | |
apiGateway: true | |
functions: | |
# Main query endpoint with API Gateway | |
query: | |
handler: src/functions/query.handler | |
description: 'Execute DuckDB queries via REST API' | |
memorySize: 2048 # Override default if needed | |
timeout: 30 | |
reservedConcurrency: 10 # Prevent cold starts | |
# Function-specific environment variables | |
environment: | |
QUERY_TIMEOUT: '25000' # 25 seconds, leaving buffer for Lambda timeout | |
# Lambda layers | |
layers: | |
- ${self:custom.duckdbLayers.x86} | |
# IAM permissions for this function | |
iamRoleStatements: | |
# S3 read permissions for data bucket | |
- Effect: Allow | |
Action: | |
- s3:GetObject | |
- s3:GetObjectVersion | |
Resource: 'arn:aws:s3:::${self:custom.dataBucket}/*' | |
- Effect: Allow | |
Action: | |
- s3:ListBucket | |
- s3:GetBucketLocation | |
Resource: 'arn:aws:s3:::${self:custom.dataBucket}' | |
# CloudWatch metrics | |
- Effect: Allow | |
Action: | |
- cloudwatch:PutMetricData | |
Resource: '*' | |
# API Gateway event | |
events: | |
- http: | |
path: ${self:custom.api.version}/query | |
method: post | |
cors: | |
origin: '*' # Configure for your domain in production | |
headers: | |
- Content-Type | |
- X-Api-Key | |
- X-Request-Id | |
private: true # Requires API key | |
request: | |
schemas: | |
application/json: ${file(schemas/query-request.json)} | |
# Streaming query endpoint with Function URL | |
streamingQuery: | |
handler: src/functions/streamingQuery.handler | |
description: 'Stream large query results via Apache Arrow' | |
memorySize: 3008 # Higher memory for streaming | |
timeout: 900 # 15 minutes max | |
# Function URL configuration | |
url: | |
invokeMode: RESPONSE_STREAM | |
cors: | |
allowOrigins: | |
- '*' # Configure for production | |
allowHeaders: | |
- Content-Type | |
- X-Request-Id | |
maxAge: 86400 | |
layers: | |
- ${self:custom.duckdbLayers.x86} | |
- ${self:custom.duckdbLayers.extensions} # Additional extensions | |
iamRoleStatements: | |
# Same S3 permissions | |
- Effect: Allow | |
Action: | |
- s3:GetObject | |
- s3:GetObjectVersion | |
Resource: 'arn:aws:s3:::${self:custom.dataBucket}/*' | |
- Effect: Allow | |
Action: | |
- s3:ListBucket | |
Resource: 'arn:aws:s3:::${self:custom.dataBucket}' | |
# Batch processing function (invoked by EventBridge or S3 events) | |
batchProcessor: | |
handler: src/functions/batchProcessor.handler | |
description: 'Process data files on S3 upload' | |
memorySize: 10240 # Maximum memory for large batches | |
timeout: 900 | |
layers: | |
- ${self:custom.duckdbLayers.x86} | |
- ${self:custom.duckdbLayers.extensions} | |
iamRoleStatements: | |
# Read from source bucket | |
- Effect: Allow | |
Action: | |
- s3:GetObject | |
Resource: 'arn:aws:s3:::${self:custom.dataBucket}/raw/*' | |
# Write to processed bucket | |
- Effect: Allow | |
Action: | |
- s3:PutObject | |
- s3:PutObjectAcl | |
Resource: 'arn:aws:s3:::${self:custom.dataBucket}/processed/*' | |
- Effect: Allow | |
Action: | |
- s3:ListBucket | |
Resource: 'arn:aws:s3:::${self:custom.dataBucket}' | |
events: | |
# Trigger on S3 uploads | |
- s3: | |
bucket: ${self:custom.dataBucket} | |
event: s3:ObjectCreated:* | |
rules: | |
- prefix: raw/ | |
- suffix: .csv | |
# Scheduled processing | |
- schedule: | |
rate: rate(1 hour) | |
enabled: false # Enable when ready | |
input: | |
action: 'hourly-aggregation' | |
# S3 Express One Zone function (uncomment to enable) | |
# queryS3Express: | |
# handler: src/functions/queryS3Express.handler | |
# description: 'Ultra-low latency queries with S3 Express' | |
# memorySize: 10240 # Maximum memory | |
# timeout: 30 | |
# | |
# layers: | |
# - ${self:custom.duckdbLayers.extensions} | |
# | |
# vpc: | |
# securityGroupIds: | |
# - !GetAtt VpcEndpointLambdaSecurityGroup.GroupId | |
# subnetIds: | |
# - !GetAtt PrivateSubnet.SubnetId | |
# | |
# iamRoleStatements: | |
# - Effect: Allow | |
# Action: | |
# - s3express:CreateSession | |
# Resource: | |
# - !Sub 'arn:aws:s3express:${AWS::Region}:${AWS::AccountId}:bucket/${self:custom.s3Express.bucketName}' | |
# | |
# events: | |
# - http: | |
# path: ${self:custom.api.version}/query-express | |
# method: post | |
# cors: true | |
# private: true | |
# CloudFormation resources | |
resources: | |
Resources: | |
# S3 bucket for query results (optional) | |
QueryResultsBucket: | |
Type: AWS::S3::Bucket | |
Properties: | |
BucketName: ${self:service}-query-results-${self:provider.stage} | |
LifecycleConfiguration: | |
Rules: | |
- Id: DeleteOldResults | |
Status: Enabled | |
ExpirationInDays: 7 # Auto-delete after 7 days | |
PublicAccessBlockConfiguration: | |
BlockPublicAcls: true | |
BlockPublicPolicy: true | |
IgnorePublicAcls: true | |
RestrictPublicBuckets: true | |
BucketEncryption: | |
ServerSideEncryptionConfiguration: | |
- ServerSideEncryptionByDefault: | |
SSEAlgorithm: AES256 | |
# CloudWatch dashboard for monitoring | |
DuckDBDashboard: | |
Type: AWS::CloudWatch::Dashboard | |
Properties: | |
DashboardName: ${self:service}-${self:provider.stage} | |
DashboardBody: !Sub | | |
{ | |
"widgets": [ | |
{ | |
"type": "metric", | |
"properties": { | |
"metrics": [ | |
["AWS/Lambda", "Invocations", {"stat": "Sum"}], | |
[".", "Errors", {"stat": "Sum"}], | |
[".", "Duration", {"stat": "Average"}], | |
[".", "ConcurrentExecutions", {"stat": "Maximum"}] | |
], | |
"period": 300, | |
"stat": "Average", | |
"region": "${AWS::Region}", | |
"title": "Lambda Metrics" | |
} | |
} | |
] | |
} | |
# Alarms for monitoring | |
QueryErrorAlarm: | |
Type: AWS::CloudWatch::Alarm | |
Properties: | |
AlarmName: ${self:service}-${self:provider.stage}-query-errors | |
AlarmDescription: 'Alert when query function has errors' | |
MetricName: Errors | |
Namespace: AWS/Lambda | |
Dimensions: | |
- Name: FunctionName | |
Value: ${self:service}-${self:provider.stage}-query | |
Statistic: Sum | |
Period: 300 | |
EvaluationPeriods: 1 | |
Threshold: 5 | |
ComparisonOperator: GreaterThanThreshold | |
TreatMissingData: notBreaching | |
# Outputs for reference | |
Outputs: | |
QueryApiUrl: | |
Description: 'API Gateway URL for queries' | |
Value: !Sub 'https://${ApiGatewayRestApi}.execute-api.${AWS::Region}.amazonaws.com/${self:provider.stage}' | |
Export: | |
Name: ${self:service}-${self:provider.stage}-api-url | |
QueryResultsBucket: | |
Description: 'S3 bucket for query results' | |
Value: !Ref QueryResultsBucket | |
Export: | |
Name: ${self:service}-${self:provider.stage}-results-bucket | |
DashboardUrl: | |
Description: 'CloudWatch Dashboard URL' | |
Value: !Sub 'https://console.aws.amazon.com/cloudwatch/home?region=${AWS::Region}#dashboards:name=${self:service}-${self:provider.stage}' | |
# Package individually for optimized deployments | |
package: | |
individually: true | |
patterns: | |
- '!.git/**' | |
- '!.gitignore' | |
- '!.DS_Store' | |
- '!npm-debug.log' | |
- '!.serverless/**' | |
- '!.serverless_plugins/**' | |
- '!tests/**' | |
- '!.vscode/**' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment