Skip to content

Instantly share code, notes, and snippets.

View jdaarevalo's full-sized avatar

José David Arévalo jdaarevalo

View GitHub Profile
@jdaarevalo
jdaarevalo / last-section-docker-compose.yaml
Last active January 30, 2021 12:14
last-section-docker-compose
...
volumes:
- ./postgres-data:/var/lib/postgresql/data
# copy the sql script to create tables
- ./sql/create_tables.sql:/docker-entrypoint-initdb.d/create_tables.sql
# copy the sql script to fill tables
- ./sql/fill_tables.sql:/docker-entrypoint-initdb.d/fill_tables.sql
@jdaarevalo
jdaarevalo / query_send_report.py
Last active August 26, 2022 06:52
Lambda function query_send_report
"""
Purpose
Run an Athena query and post in Slack the output location as CSV
Tag the user who does the request
"""
import os
import json
import time
@jdaarevalo
jdaarevalo / template.yaml
Last active April 21, 2023 18:40
create glue table
GlueRawDataNYTimesCovidTable:
Type: 'AWS::Glue::Table'
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref GlueRawDataBase
TableInput:
Description: "Raw Data on COVID-19 cases from NY Times at US state level."
TableType: "EXTERNAL_TABLE"
Retention: 0
Name: covid_nytimes_states
ProcessedDataS3Bucket:
Type: 'AWS::S3::Bucket'
Properties:
BucketName: !Join
- "-"
- - "scatter-gather-processed-data"
- !Select
- 0
- !Split
- "-"
ProcessorFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: processor_lambda.lambda_handler
Runtime: python3.9
Architectures:
- x86_64
Policies:
- AmazonAthenaFullAccess
import os
import json
import awswrangler as wr
from datetime import datetime
from aws_lambda_powertools import Logger
logger = Logger()
ATHENA_RAW_DATABASE_NAME = os.getenv('ATHENA_RAW_DATABASE_NAME')
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME')
SGProcessesDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Ref SGProcessesTableName
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
KeySchema:
-
AttributeName: "scatter_gather_id"
ScatterFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: scatter_lambda.lambda_handler
Runtime: python3.9
Architectures:
- x86_64
Policies:
- AmazonDynamoDBFullAccess
@logger.inject_lambda_context
def lambda_handler(event, context):
logger.info({"action":"invoke_lambda", "payload":{"event":event}})
timestamp = int(time.time())
# read the contries from the athena table
query = "select distinct state from covid_nytimes_states"
unique_states = wr.athena.read_sql_query(query, database=ATHENA_RAW_DATABASE_NAME)
# write in SG_AGGREGATE_TABLE_NAME DynamoDB table how many states should be executed
item = {
from dynamo_operations import update_item_finished
...
SG_PROCESSES_TABLE_NAME = os.getenv('SG_PROCESSES_TABLE_NAME')
...
@logger.inject_lambda_context
def lambda_handler(event, context):
...