Skip to content

Instantly share code, notes, and snippets.

@bneutra
Last active April 7, 2025 18:05
Show Gist options
  • Save bneutra/9389516cfbe63e2c2fb291bdda08db63 to your computer and use it in GitHub Desktop.
Save bneutra/9389516cfbe63e2c2fb291bdda08db63 to your computer and use it in GitHub Desktop.
rds_dd_monitor.py
#!/usr/bin/env python3
import os
import sys
import json
from datetime import datetime, timedelta
import requests
import boto3
# RDS snapshot count monitor ID
MONITOR_ID = 134246623
REGIONS = ["us-east-1", "us-west-2", "eu-central-1"]
# Get API credentials from environment variables
# You can set these with:
# export DATADOG_API_KEY="your_api_key"
# export DATADOG_APP_KEY="your_app_key"
API_KEY = os.environ.get('DATADOG_API_KEY')
APP_KEY = os.environ.get('DATADOG_APP_KEY')
if not API_KEY or not APP_KEY:
print("Error: Datadog API and APP keys must be set as environment variables.")
print("Export DATADOG_API_KEY and DATADOG_APP_KEY in your shell.")
sys.exit(1)
# Datadog API base URL
API_URL = "https://api.datadoghq.com/api/v1"
# Set up headers for API requests
HEADERS = {
"Content-Type": "application/json",
"DD-API-KEY": API_KEY,
"DD-APPLICATION-KEY": APP_KEY
}
def get_monitor_events(monitor_id, hours=24):
"""
Get events for a specific monitor over the last specified hours.
Args:
monitor_id (int): The ID of the monitor to query
hours (int): Number of hours to look back
Returns:
list: List of event messages with timestamps
"""
# Calculate time window
now = datetime.now()
start_time = int((now - timedelta(hours=hours)).timestamp())
end_time = int(now.timestamp())
print(f"Fetching events from {datetime.fromtimestamp(start_time)} to {datetime.fromtimestamp(end_time)}")
results = []
monitor_response = requests.get(f"{API_URL}/monitor/{monitor_id}", headers=HEADERS, params={"group_states": "all"})
if monitor_response.status_code != 200:
print(f"Error fetching monitor details: {monitor_response.status_code} - {monitor_response.text}")
return []
for k, v in monitor_response.json()["state"]["groups"].items():
if v.get("status") != "OK":
db = v.get("name")
results.append(db.replace("deploy:", ""))
return results
def get_resource_client(region: str):
return boto3.client("resourcegroupstaggingapi", region_name=region)
def get_db_arn(deploy: str, region: str) -> str:
client = get_resource_client(region)
response = client.get_resources(
ResourceTypeFilters=["rds:db"],
TagFilters=[
{"Key": "Deploy", "Values": [deploy]},
{"Key": "Role", "Values": ["core"]},
],
)
for resource in response["ResourceTagMappingList"]:
tags = {tag["Key"]: tag["Value"] for tag in resource["Tags"]}
return resource["ResourceARN"]
return None
def main():
"""Main function to run the script."""
results = get_monitor_events(MONITOR_ID)
events = 0
for region in REGIONS:
for db in results:
arn = get_db_arn(db, region)
if arn:
events += 1
print(arn)
print(f"{events} events found.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment