Skip to content

Instantly share code, notes, and snippets.

@b-per
Created July 24, 2024 10:19
Show Gist options
  • Save b-per/490bb892bb3c63253097fed7ff87d884 to your computer and use it in GitHub Desktop.
Save b-per/490bb892bb3c63253097fed7ff87d884 to your computer and use it in GitHub Desktop.
Lambda function to read audit logs from dbt Cloud and push them to S3
import json
import boto3
import requests
from datetime import datetime, timedelta, UTC
NUM_RECORDS = 50
def datetime_formatter(dt: datetime):
return dt.isoformat(timespec="milliseconds")[:23] + "Z"
def get_paginated_data(url, headers):
# make the API call
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise Exception(f"Request failed: {response.text}")
pages = [response.text]
# get total number of items
total_count = response.json()["extra"]["pagination"]["total_count"]
num_records = NUM_RECORDS
# and go through all the pages required
while num_records < total_count:
url_with_offset = f"{url}&offset={num_records}"
response = requests.get(url_with_offset, headers=headers)
if response.status_code != 200:
raise Exception(f"Request failed: {response.text}")
pages.append(response.text)
num_records = num_records + NUM_RECORDS
# we return all the pages in a list
return pages
def lambda_handler(event, context):
# dbt Cloud settings
dbt_cloud_account_id = 12345
dbt_cloud_host = "emea.dbt.com"
# settings for the secret where the dbt Cloud token is stored
secret_name = "benoit-account-viewer"
secret_key = "dbt_cloud_account_viewer"
# information on where to save the data on S3
s3_bucket_name = "bper-test"
s3_folder_name = "dbt-cloud-audit-logs"
# how many minutes to go back when getting the logs
# should be equal or higher to the frequency at which this function runs
loopback_time_minutes = 10
# we check the current time and create parameters to check how much to go back
to = datetime.now(UTC)
to_formatted = datetime_formatter(to)
from_formatted = datetime_formatter(to - timedelta(minutes=loopback_time_minutes))
# we retrieve the secrets
aws_sec_manager = boto3.client("secretsmanager")
aws_sec_manager_response = aws_sec_manager.get_secret_value(SecretId=secret_name)
secret_string = aws_sec_manager_response["SecretString"]
dbt_cloud_token = json.loads(secret_string).get(secret_key)
# we connect to S3
s3_new_file_name = f"audit_log_{to_formatted}.log"
s3_client = boto3.client("s3")
s3_file_path = f"{s3_folder_name}/{s3_new_file_name}"
# we set our values and call the API in a paginated way
dbt_headers = {"Authorization": f"Token {dbt_cloud_token}"}
dbt_endpoint = f"https://{dbt_cloud_host}/api/v3/accounts/{dbt_cloud_account_id}/audit-logs/?limit={NUM_RECORDS}&logged_at_start={from_formatted}&logged_at_end={to_formatted}"
data = get_paginated_data(dbt_endpoint, dbt_headers)
# we transform the list into a JSONL format
data_with_newlines = "\n".join(data)
# Upload the JSONL to S3
s3_client.put_object(Body=data_with_newlines, Bucket=s3_bucket_name, Key=s3_file_path)
return {"statusCode": 200, "body": json.dumps("Success: Logs retrieved and saved")}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment