Last active
April 7, 2025 18:05
-
-
Save bneutra/9389516cfbe63e2c2fb291bdda08db63 to your computer and use it in GitHub Desktop.
rds_dd_monitor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import json | |
from datetime import datetime, timedelta | |
import requests | |
import boto3 | |
# RDS snapshot count monitor ID | |
MONITOR_ID = 134246623 | |
REGIONS = ["us-east-1", "us-west-2", "eu-central-1"] | |
# Get API credentials from environment variables | |
# You can set these with: | |
# export DATADOG_API_KEY="your_api_key" | |
# export DATADOG_APP_KEY="your_app_key" | |
API_KEY = os.environ.get('DATADOG_API_KEY') | |
APP_KEY = os.environ.get('DATADOG_APP_KEY') | |
if not API_KEY or not APP_KEY: | |
print("Error: Datadog API and APP keys must be set as environment variables.") | |
print("Export DATADOG_API_KEY and DATADOG_APP_KEY in your shell.") | |
sys.exit(1) | |
# Datadog API base URL | |
API_URL = "https://api.datadoghq.com/api/v1" | |
# Set up headers for API requests | |
HEADERS = { | |
"Content-Type": "application/json", | |
"DD-API-KEY": API_KEY, | |
"DD-APPLICATION-KEY": APP_KEY | |
} | |
def get_monitor_events(monitor_id, hours=24): | |
""" | |
Get events for a specific monitor over the last specified hours. | |
Args: | |
monitor_id (int): The ID of the monitor to query | |
hours (int): Number of hours to look back | |
Returns: | |
list: List of event messages with timestamps | |
""" | |
# Calculate time window | |
now = datetime.now() | |
start_time = int((now - timedelta(hours=hours)).timestamp()) | |
end_time = int(now.timestamp()) | |
print(f"Fetching events from {datetime.fromtimestamp(start_time)} to {datetime.fromtimestamp(end_time)}") | |
results = [] | |
monitor_response = requests.get(f"{API_URL}/monitor/{monitor_id}", headers=HEADERS, params={"group_states": "all"}) | |
if monitor_response.status_code != 200: | |
print(f"Error fetching monitor details: {monitor_response.status_code} - {monitor_response.text}") | |
return [] | |
for k, v in monitor_response.json()["state"]["groups"].items(): | |
if v.get("status") != "OK": | |
db = v.get("name") | |
results.append(db.replace("deploy:", "")) | |
return results | |
def get_resource_client(region: str): | |
return boto3.client("resourcegroupstaggingapi", region_name=region) | |
def get_db_arn(deploy: str, region: str) -> str: | |
client = get_resource_client(region) | |
response = client.get_resources( | |
ResourceTypeFilters=["rds:db"], | |
TagFilters=[ | |
{"Key": "Deploy", "Values": [deploy]}, | |
{"Key": "Role", "Values": ["core"]}, | |
], | |
) | |
for resource in response["ResourceTagMappingList"]: | |
tags = {tag["Key"]: tag["Value"] for tag in resource["Tags"]} | |
return resource["ResourceARN"] | |
return None | |
def main(): | |
"""Main function to run the script.""" | |
results = get_monitor_events(MONITOR_ID) | |
events = 0 | |
for region in REGIONS: | |
for db in results: | |
arn = get_db_arn(db, region) | |
if arn: | |
events += 1 | |
print(arn) | |
print(f"{events} events found.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment