Created
February 15, 2024 17:32
-
-
Save lloesche/6710898dbda268b626819d88f754cc9c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
from datetime import datetime, timedelta | |
from typing import Dict, List | |
def cpu_usage_metrics_for_instance( | |
region: str, instance_id: str, start_time: datetime, duration: timedelta | |
) -> Dict[str, float]: | |
end_time = start_time + duration | |
metric_name = "CPUUtilization" | |
stats: List[str] = ["Minimum", "Maximum", "Average"] | |
# Calculate period in seconds, respecting CloudWatch's constraints | |
total_seconds = int(duration.total_seconds()) | |
if total_seconds <= 3600: # Up to 1 hour | |
period = max(total_seconds // 60, 60) # Minimum of 60 seconds | |
elif total_seconds <= 86400: # Up to 24 hours | |
period = 300 # 5 minutes | |
else: | |
period = max(3600, (total_seconds // 86400) * 3600) | |
# Ensure period is a multiple of 60 seconds | |
period = max(60, period - (period % 60)) | |
cloudwatch = boto3.client("cloudwatch", region_name=region) | |
metrics = cloudwatch.get_metric_statistics( | |
Namespace="AWS/EC2", | |
MetricName=metric_name, | |
Dimensions=[{"Name": "InstanceId", "Value": instance_id}], | |
StartTime=start_time, | |
EndTime=end_time, | |
Period=period, | |
Statistics=stats, | |
) | |
# Initialize response structure | |
metric_stats: Dict[str, Dict[str, float]] = {metric_name: {stat: None for stat in stats}} | |
# Extract the metrics | |
if metrics["Datapoints"]: | |
for stat in stats: | |
values = [datapoint[stat] for datapoint in metrics["Datapoints"] if stat in datapoint] | |
if values: | |
metric_stats[metric_name][stat] = { | |
"Minimum": round(min(values), 4), | |
"Maximum": round(max(values), 4), | |
"Average": round(sum(values) / len(values), 4), | |
}.get(stat, None) | |
return metric_stats | |
def disk_usage_metrics_for_volume(region: str, volume_id: str, start_time: datetime, duration: timedelta): | |
# Calculate end time based on start_time and duration | |
end_time = start_time + duration | |
# Initialize CloudWatch client for the specified region | |
cloudwatch = boto3.client("cloudwatch", region_name=region) | |
# Minimum period in seconds (60 seconds = 1 minute) | |
min_period = 60 | |
# Define metrics to fetch | |
metrics_info = { | |
"VolumeReadBytes": {"Unit": "Bytes", "Stats": {}}, | |
"VolumeWriteBytes": {"Unit": "Bytes", "Stats": {}}, | |
"VolumeReadOps": {"Unit": "Count", "Stats": {}}, | |
"VolumeWriteOps": {"Unit": "Count", "Stats": {}}, | |
} | |
# Fetch and calculate metrics for each | |
for metric_name, info in metrics_info.items(): | |
response = cloudwatch.get_metric_statistics( | |
Namespace="AWS/EBS", | |
MetricName=metric_name, | |
Dimensions=[{"Name": "VolumeId", "Value": volume_id}], | |
StartTime=start_time, | |
EndTime=end_time, | |
Period=min_period, | |
Statistics=["Sum"], | |
Unit=info["Unit"], | |
) | |
# Initialize variables for calculations | |
total = 0 | |
data_points_counter = 0 | |
min_val = float("inf") | |
max_val = 0 | |
# Process datapoints | |
for dp in response["Datapoints"]: | |
val = dp["Sum"] | |
total += val | |
data_points_counter += 1 | |
if val < min_val: | |
min_val = val | |
if val > max_val: | |
max_val = val | |
# Calculate min, max, and avg if data is available | |
if data_points_counter > 0: | |
avg_val = total / data_points_counter | |
# Adjust unit for bytes to MB for throughput metrics | |
if "Bytes" in metric_name: | |
conversion_factor = min_period * 1024 * 1024 | |
else: # For IOPS, no conversion needed | |
conversion_factor = min_period | |
metrics_info[metric_name]["Stats"]["Minimum"] = round(min_val / conversion_factor, 4) | |
metrics_info[metric_name]["Stats"]["Maximum"] = round(max_val / conversion_factor, 4) | |
metrics_info[metric_name]["Stats"]["Average"] = round(avg_val / conversion_factor, 4) | |
else: | |
metrics_info[metric_name]["Stats"] = {"Minimum": 0.0, "Maximum": 0.0, "Average": 0.0} | |
# Organize results for better readability | |
results = {} | |
for metric, info in metrics_info.items(): | |
metric_type = "ThroughputMBps" if "Bytes" in metric else "IOPS" | |
operation = "Read" if "Read" in metric else "Write" | |
metric_key = f"{operation}{metric_type}" | |
results[metric_key] = info["Stats"] | |
return results | |
def rds_metrics_for_instance(region: str, db_instance_identifier: str, start_time: datetime, duration: timedelta): | |
end_time = start_time + duration | |
cloudwatch = boto3.client("cloudwatch", region_name=region) | |
min_period = 60 | |
metrics = [ | |
"CPUUtilization", | |
"DatabaseConnections", | |
"ReadIOPS", | |
"WriteIOPS", | |
"ReadLatency", | |
"WriteLatency", | |
"FreeStorageSpace", | |
] | |
results = {} | |
for metric_name in metrics: | |
response = cloudwatch.get_metric_statistics( | |
Namespace="AWS/RDS", | |
MetricName=metric_name, | |
Dimensions=[{"Name": "DBInstanceIdentifier", "Value": db_instance_identifier}], | |
StartTime=start_time, | |
EndTime=end_time, | |
Period=min_period, | |
Statistics=["Minimum", "Maximum", "Average"], | |
Unit="Percent" if metric_name == "CPUUtilization" else "Count", | |
) | |
min_val, max_val, avg_val = 0.0, 0.0, 0.0 | |
if response["Datapoints"]: | |
min_val = round(min(dp["Minimum"] for dp in response["Datapoints"]), 4) | |
max_val = round(max(dp["Maximum"] for dp in response["Datapoints"]), 4) | |
avg_val = round(sum(dp["Average"] for dp in response["Datapoints"]) / len(response["Datapoints"]), 4) | |
results[metric_name] = {"Minimum": min_val, "Maximum": max_val, "Average": avg_val} | |
return results | |
def ec2_network_metrics_combined(region: str, network_interface_id: str, start_time: datetime, duration: timedelta): | |
end_time = start_time + duration | |
cloudwatch = boto3.client("cloudwatch", region_name=region) | |
metrics = {"Bytes": ["NetworkIn", "NetworkOut"], "Packets": ["NetworkPacketsIn", "NetworkPacketsOut"]} | |
min_period = 60 | |
results = {} | |
for metric_type, metric_names in metrics.items(): | |
for metric_name in metric_names: | |
response = cloudwatch.get_metric_statistics( | |
Namespace="AWS/EC2", | |
MetricName=metric_name, | |
Dimensions=[{"Name": "NetworkInterfaceId", "Value": network_interface_id}], | |
StartTime=start_time, | |
EndTime=end_time, | |
Period=min_period, | |
Statistics=["Sum"], | |
) | |
if response["Datapoints"]: | |
sums = [dp["Sum"] for dp in response["Datapoints"]] | |
if metric_type == "Bytes": | |
converted_sums = [(value * 8 / (1024**2)) for value in sums] | |
else: | |
converted_sums = sums | |
min_val = round(min(converted_sums) / min_period, 4) | |
max_val = round(max(converted_sums) / min_period, 4) | |
avg_val = round(sum(converted_sums) / len(converted_sums) / min_period, 4) | |
else: | |
min_val = max_val = avg_val = 0.0 | |
unit = "Mbps" if metric_type == "Bytes" else "PPS" | |
metric_key = f"{metric_name}{unit}" | |
results[metric_key] = {"Min": min_val, "Max": max_val, "Avg": avg_val} | |
return results | |
if __name__ == "__main__": | |
region = "us-east-2" | |
instance_id = "i-046ab414885b48fee" | |
volume_id = "vol-05808092af751c33b" | |
eni_id = "eni-06d0753e24553733c" | |
rds_id = "insecurestack-unencryptedrdsinstance-howslpcppcwp" | |
start_time = datetime.utcnow() - timedelta(hours=1) | |
duration = timedelta(hours=1) | |
print(f"{instance_id}: {cpu_usage_metrics_for_instance(region, instance_id, start_time, duration)}") | |
print(f"{volume_id}: {disk_usage_metrics_for_volume(region, volume_id, start_time, duration)}") | |
print(f"{rds_id}: {rds_metrics_for_instance(region, rds_id, start_time, duration)}") | |
print(f"{eni_id}: {ec2_network_metrics_combined(region, eni_id, start_time, duration)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment