Last active
October 1, 2022 18:59
-
-
Save slotrans/cf065be96f4c5563237c5dbef282d982 to your computer and use it in GitHub Desktop.
lambda function to track data lake inventory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# If you have a data lake, you will often want to be able to ask questions about what's in it, and prefix-based listings of | |
# objects, as provided by S3 and all S3-alikes, tightly constrain your ability to do so. Using a simple Lambda function like | |
# this (which can be adapted to the FaaS platform of your choice) together with an RDBMS gives you a much more flexible way of | |
# asking meta questions about what's in your lake. | |
# Relevant table schema, adjust names as you like... | |
# | |
# create table lake.inventory | |
# ( | |
# inventory_id bigserial primary key | |
# , created timestamptz not null default now() | |
# , bucket varchar(64) not null | |
# , key text not null | |
# , size bigint not null | |
# , event_time timestamptz not null | |
# ); | |
# | |
# create unique index uidx_inventory_bucketkey on lake.inventory(bucket, key) ; | |
import boto3 | |
import psycopg2 | |
from urllib.parse import unquote | |
INSERT_SQL = """ | |
insert into lake.inventory | |
( bucket | |
, key | |
, size | |
, event_time | |
) | |
values | |
( %(bucket)s | |
, %(key)s | |
, %(size)s | |
, cast(%(event_time)s as timestamptz) | |
) | |
on conflict (bucket, key) | |
do update set size = excluded.size | |
, event_time = excluded.event_time | |
""".strip() | |
DELETE_SQL = "delete from lake.inventory where ( bucket, key ) = ( %(bucket)s, %(key)s )" | |
# Could pull these from env vars of course, but we treated them as Well Known constants that didn't need to be configurable. | |
# You may be tempted to put ALL of these in the secret. Don't! Hostnames and usernames are not secrets! | |
DBHOST = 'your.db.host.name.here' | |
DBUSER = 'dont_connect_as_a_superuser' | |
DBCONN = None | |
DBPW = None | |
def get_db_password(): | |
global DBPW | |
if DBPW is not None: | |
return DBPW | |
else: | |
client = boto3.client('secretsmanager') | |
response = client.get_secret_value(SecretId='YourSecretKeyNameHere') | |
DBPW = response['SecretString'] | |
return DBPW | |
def get_connection(): | |
global DBCONN | |
if DBCONN is not None: | |
return DBCONN | |
else: | |
DBCONN = psycopg2.connect( | |
host=DBHOST, | |
database='your-database', | |
user=DBUSER, | |
password=get_db_password() | |
) | |
DBCONN.autocommit = True | |
return DBCONN | |
# set the Lambda entry point to this function | |
def handler(event, context): | |
for r in event['Records']: | |
if r['eventName'].startswith('ObjectCreated'): | |
handle_put(r) | |
elif r['eventName'].startswith('ObjectRemoved'): | |
handle_delete(r) | |
return True | |
def handle_put(record): | |
connection = get_connection() | |
with connection.cursor() as cur: | |
bucket = record['s3']['bucket']['name'] | |
key = unquote(record['s3']['object']['key']) | |
size = record['s3']['object']['size'] | |
event_time = record['eventTime'] | |
cur.execute(INSERT_SQL, dict(bucket=bucket, key=key, size=size, event_time=event_time)) | |
def handle_delete(record): | |
connection = get_connection() | |
with connection.cursor() as cur: | |
bucket = record['s3']['bucket']['name'] | |
key = unquote(record['s3']['object']['key']) | |
cur.execute(DELETE_SQL, dict(bucket=bucket, key=key)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment