Created
March 29, 2026 13:19
-
-
Save Burekasim/3ef9e855694fde93edbe00b988f70650 to your computer and use it in GitHub Desktop.
AWS Cost and Usage report exporter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import csv | |
| from decimal import Decimal, InvalidOperation | |
| from collections import defaultdict | |
| from typing import Dict, Tuple, Iterable, Optional, List | |
| DEFAULT_COST_CANDIDATES = [ | |
| "lineItem/NetUnblendedCost", | |
| "lineItem/UnblendedCost", | |
| "reservation/EffectiveCost", | |
| "reservation/NetEffectiveCost", | |
| "savingsPlan/SavingsPlanEffectiveCost", | |
| "savingsPlan/NetSavingsPlanEffectiveCost", | |
| "lineItem/BlendedCost", | |
| ] | |
| RESOURCE_ID_COL = "lineItem/ResourceId" | |
| USAGE_TYPE_COL = "lineItem/UsageType" | |
| USAGE_AMOUNT_COL = "lineItem/UsageAmount" | |
| USAGE_START_COL = "lineItem/UsageStartDate" | |
| PRODUCT_CODE_COL = "lineItem/ProductCode" | |
| USAGE_ACCOUNT_COL = "lineItem/UsageAccountId" | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="Aggregate AWS CUR cost and usage by resource ID, optionally split by usage type." | |
| ) | |
| parser.add_argument( | |
| "input_files", | |
| nargs="+", | |
| help="One or more CUR CSV files", | |
| ) | |
| parser.add_argument( | |
| "--mode", | |
| choices=["resource", "resource-usagetype"], | |
| default="resource", | |
| help="Aggregation mode", | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| default="output.csv", | |
| help="Output CSV file path", | |
| ) | |
| parser.add_argument( | |
| "--cost-column", | |
| default=None, | |
| help=( | |
| "Specific cost column to use. " | |
| "If omitted, the script auto-selects the first existing column from: " | |
| + ", ".join(DEFAULT_COST_CANDIDATES) | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--include-empty-resource-id", | |
| action="store_true", | |
| help="Include rows where lineItem/ResourceId is empty", | |
| ) | |
| parser.add_argument( | |
| "--usage-account-id", | |
| default=None, | |
| help="Only include rows for this lineItem/UsageAccountId", | |
| ) | |
| parser.add_argument( | |
| "--product-code", | |
| default=None, | |
| help="Only include rows for this lineItem/ProductCode, e.g. AmazonEC2", | |
| ) | |
| parser.add_argument( | |
| "--start-date", | |
| default=None, | |
| help="Only include rows with lineItem/UsageStartDate >= this value (YYYY-MM-DD or ISO timestamp)", | |
| ) | |
| parser.add_argument( | |
| "--end-date", | |
| default=None, | |
| help="Only include rows with lineItem/UsageStartDate < this value (YYYY-MM-DD or ISO timestamp)", | |
| ) | |
| return parser.parse_args() | |
| def to_decimal(value: Optional[str]) -> Decimal: | |
| if value is None: | |
| return Decimal("0") | |
| value = value.strip() | |
| if value == "": | |
| return Decimal("0") | |
| try: | |
| return Decimal(value) | |
| except (InvalidOperation, ValueError): | |
| return Decimal("0") | |
| def detect_cost_column(fieldnames: Iterable[str], requested: Optional[str]) -> str: | |
| fieldnames = list(fieldnames) | |
| if requested: | |
| if requested not in fieldnames: | |
| raise ValueError(f"Requested cost column '{requested}' not found in file") | |
| return requested | |
| for col in DEFAULT_COST_CANDIDATES: | |
| if col in fieldnames: | |
| return col | |
| raise ValueError( | |
| "Could not find a usable cost column. " | |
| f"Tried: {', '.join(DEFAULT_COST_CANDIDATES)}" | |
| ) | |
| def date_in_range( | |
| usage_start: str, | |
| start_date: Optional[str], | |
| end_date: Optional[str], | |
| ) -> bool: | |
| if not usage_start: | |
| return False | |
| # ISO-8601 strings compare correctly lexicographically | |
| if start_date and usage_start < start_date: | |
| return False | |
| if end_date and usage_start >= end_date: | |
| return False | |
| return True | |
| def row_matches_filters(row: dict, args: argparse.Namespace) -> bool: | |
| resource_id = (row.get(RESOURCE_ID_COL) or "").strip() | |
| if not args.include_empty_resource_id and not resource_id: | |
| return False | |
| if args.usage_account_id and (row.get(USAGE_ACCOUNT_COL) or "").strip() != args.usage_account_id: | |
| return False | |
| if args.product_code and (row.get(PRODUCT_CODE_COL) or "").strip() != args.product_code: | |
| return False | |
| if args.start_date or args.end_date: | |
| if not date_in_range(row.get(USAGE_START_COL, ""), args.start_date, args.end_date): | |
| return False | |
| return True | |
| def aggregate_cur(args: argparse.Namespace) -> Tuple[dict, List[str]]: | |
| aggregates: Dict[Tuple[str, ...], Dict[str, Decimal]] = defaultdict( | |
| lambda: {"usage_amount": Decimal("0"), "cost": Decimal("0"), "line_count": Decimal("0")} | |
| ) | |
| cost_columns_used: List[str] = [] | |
| for input_file in args.input_files: | |
| with open(input_file, "r", newline="", encoding="utf-8-sig") as f: | |
| reader = csv.DictReader(f) | |
| if not reader.fieldnames: | |
| raise ValueError(f"Input CSV has no header row: {input_file}") | |
| cost_column = detect_cost_column(reader.fieldnames, args.cost_column) | |
| cost_columns_used.append(cost_column) | |
| for row in reader: | |
| if not row_matches_filters(row, args): | |
| continue | |
| resource_id = (row.get(RESOURCE_ID_COL) or "").strip() | |
| usage_type = (row.get(USAGE_TYPE_COL) or "").strip() | |
| usage_amount = to_decimal(row.get(USAGE_AMOUNT_COL)) | |
| cost = to_decimal(row.get(cost_column)) | |
| if args.mode == "resource": | |
| key = (resource_id,) | |
| else: | |
| key = (resource_id, usage_type) | |
| aggregates[key]["usage_amount"] += usage_amount | |
| aggregates[key]["cost"] += cost | |
| aggregates[key]["line_count"] += Decimal("1") | |
| return aggregates, cost_columns_used | |
| def write_output( | |
| output_file: str, | |
| mode: str, | |
| aggregates: dict, | |
| cost_columns_used: List[str], | |
| ) -> None: | |
| if mode == "resource": | |
| fieldnames = ["resource_id", "total_usage_amount", "total_cost", "line_count", "cost_columns_used"] | |
| else: | |
| fieldnames = [ | |
| "resource_id", | |
| "usage_type", | |
| "total_usage_amount", | |
| "total_cost", | |
| "line_count", | |
| "cost_columns_used", | |
| ] | |
| sorted_items = sorted( | |
| aggregates.items(), | |
| key=lambda item: (item[1]["cost"], item[1]["usage_amount"]), | |
| reverse=True, | |
| ) | |
| unique_cost_columns = ",".join(sorted(set(cost_columns_used))) | |
| with open(output_file, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for key, data in sorted_items: | |
| row = { | |
| "total_usage_amount": str(data["usage_amount"]), | |
| "total_cost": str(data["cost"]), | |
| "line_count": str(int(data["line_count"])), | |
| "cost_columns_used": unique_cost_columns, | |
| } | |
| if mode == "resource": | |
| row["resource_id"] = key[0] | |
| else: | |
| row["resource_id"] = key[0] | |
| row["usage_type"] = key[1] | |
| writer.writerow(row) | |
| def main() -> None: | |
| args = parse_args() | |
| aggregates, cost_columns_used = aggregate_cur(args) | |
| write_output(args.output, args.mode, aggregates, cost_columns_used) | |
| print(f"Done. Wrote {len(aggregates)} aggregated rows to {args.output}") | |
| print(f"Processed files: {len(args.input_files)}") | |
| print(f"Cost columns used: {', '.join(sorted(set(cost_columns_used)))}") | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Aggregate multiple files by resource ID:
python cur-exporter.py jan.csv feb.csv mar.csv --mode resource --output by_resource.csvAggregate multiple files by resource ID + usage type:
python cur-exporter.py jan.csv feb.csv mar.csv --mode resource-usagetype --output by_resource_usagetype.csv