Last active
August 13, 2025 20:36
-
-
Save filipeandre/238928e7aaf79c908a548c7174980557 to your computer and use it in GitHub Desktop.
Count alarms based on -p suffix
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Count AWS CloudWatch alarms: | |
| - If three alarms exist with the same base name and end with p1, p2, p3 (e.g., foo-p1, foo-p2, foo-p3), | |
| they count as 1 (a "triad"). | |
| - All other alarms are counted separately. | |
| Usage: | |
| python count_alarms.py [--region eu-west-1] [--role-arn arn:aws:iam::123456789012:role/RoleName] | |
| """ | |
| import argparse | |
| import os | |
| import re | |
| import sys | |
| from typing import Dict, List, Set, Tuple | |
| import boto3 | |
| from botocore.config import Config | |
| from botocore.exceptions import BotoCoreError, ClientError | |
| SUFFIX_RE = re.compile(r"""(?ix) # case-insensitive, verbose | |
| (?:[-_]?p([123]))$ # match optional '-' or '_' then 'p' and 1/2/3 at end of string | |
| """) | |
| def assume_role_session(role_arn: str, session_name: str = "count-alarms-session", region_name: str = None) -> boto3.Session: | |
| sts = boto3.client("sts", region_name=region_name) | |
| creds = sts.assume_role(RoleArn=role_arn, RoleSessionName=session_name)["Credentials"] | |
| return boto3.Session( | |
| aws_access_key_id=creds["AccessKeyId"], | |
| aws_secret_access_key=creds["SecretAccessKey"], | |
| aws_session_token=creds["SessionToken"], | |
| region_name=region_name, | |
| ) | |
| def get_cloudwatch_client(session: boto3.Session) -> any: | |
| return session.client("cloudwatch", config=Config(retries={"max_attempts": 10, "mode": "standard"})) | |
| def paginate_alarms(cw) -> List[Dict]: | |
| alarms: List[Dict] = [] | |
| paginator = cw.get_paginator("describe_alarms") | |
| for page in paginator.paginate(): | |
| alarms.extend(page.get("MetricAlarms", [])) | |
| alarms.extend(page.get("CompositeAlarms", [])) | |
| return alarms | |
| def split_base_and_level(name: str) -> Tuple[str, str]: | |
| """ | |
| Returns (base_name, level) where level is '1','2','3' if the name ends with p1/p2/p3 | |
| (optionally preceded by '-' or '_'), otherwise level is ''. | |
| """ | |
| m = SUFFIX_RE.search(name) | |
| if not m: | |
| return name.strip(), "" | |
| level = m.group(1) | |
| base = name[: m.start()].rstrip("-_ ").strip() | |
| return base, level | |
| def count_triads_and_others(alarm_names: List[str]) -> Tuple[int, int, int, int]: | |
| """ | |
| Returns (total_alarms, triad_groups, triad_alarm_members, other_alarms) | |
| - triad_groups: number of complete base triads counted as 1 each | |
| - triad_alarm_members: number of alarms that are part of complete triads (should be triad_groups * 3) | |
| - other_alarms: all remaining alarms (including partial p-suffix groups and non-suffixed) | |
| """ | |
| by_base: Dict[str, Set[str]] = {} | |
| non_suffixed: List[str] = [] | |
| for n in alarm_names: | |
| base, level = split_base_and_level(n) | |
| if level in {"1", "2", "3"}: | |
| by_base.setdefault(base, set()).add(level) | |
| else: | |
| non_suffixed.append(n) | |
| triad_groups = 0 | |
| triad_alarm_members = 0 | |
| partial_orphans = 0 | |
| for base, levels in by_base.items(): | |
| if {"1", "2", "3"}.issubset(levels): | |
| triad_groups += 1 | |
| triad_alarm_members += 3 | |
| else: | |
| # count all members of incomplete groups as "others" | |
| partial_orphans += len(levels) | |
| total_alarms = len(alarm_names) | |
| other_alarms = len(non_suffixed) + partial_orphans | |
| # Sanity guard: others should equal total - triad_members | |
| # but in case of unexpected duplicates, clamp to non-negative. | |
| other_alarms = max(0, total_alarms - triad_alarm_members) | |
| return total_alarms, triad_groups, triad_alarm_members, other_alarms | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Count CloudWatch alarms with p1/p2/p3 triad collapsing.") | |
| parser.add_argument("--region", default=os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION"), | |
| help="AWS region (defaults to AWS_REGION/AWS_DEFAULT_REGION or the SDK default).") | |
| parser.add_argument("--role-arn", help="Optional role ARN to assume before listing alarms.") | |
| args = parser.parse_args() | |
| try: | |
| if args.role_arn: | |
| session = assume_role_session(args.role_arn, region_name=args.region) | |
| else: | |
| session = boto3.Session(region_name=args.region) | |
| region_to_show = session.region_name or "AWS SDK default" | |
| cw = get_cloudwatch_client(session) | |
| all_alarms = paginate_alarms(cw) | |
| names = [a.get("AlarmName") for a in all_alarms if a.get("AlarmName")] | |
| total_alarms, triad_groups, triad_members, other_alarms = count_triads_and_others(names) | |
| effective_total = triad_groups + other_alarms | |
| print(f"Region: {region_to_show}") | |
| print(f"Raw alarms found: {total_alarms}") | |
| print(f"Complete triad groups (p1+p2+p3 counted as 1 each): {triad_groups}") | |
| print(f"Alarms participating in complete triads: {triad_members}") | |
| print(f"All other alarms (including partial groups and no suffix): {other_alarms}") | |
| print("-" * 60) | |
| print(f"Effective total (triads collapsed): {effective_total}") | |
| except (BotoCoreError, ClientError) as e: | |
| print(f"ERROR: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
python3 -c 'import urllib.request as u; s=u.urlopen("https://gist.githubusercontent.com/filipeandre/238928e7aaf79c908a548c7174980557/raw/90c1b519c0b991d36eaf73c0fe9ec97d4ed64384/count_alarms.py").read().decode(); s=s.replace("f\"","f"").replace(")\"", ")""); exec(compile(s,"","exec"), {"name":"main"})'