Skip to content

Instantly share code, notes, and snippets.

@Burekasim
Created March 29, 2026 13:19
Show Gist options
  • Select an option

  • Save Burekasim/3ef9e855694fde93edbe00b988f70650 to your computer and use it in GitHub Desktop.

Select an option

Save Burekasim/3ef9e855694fde93edbe00b988f70650 to your computer and use it in GitHub Desktop.
AWS Cost and Usage report exporter
#!/usr/bin/env python3
import argparse
import csv
from decimal import Decimal, InvalidOperation
from collections import defaultdict
from typing import Dict, Tuple, Iterable, Optional, List
DEFAULT_COST_CANDIDATES = [
"lineItem/NetUnblendedCost",
"lineItem/UnblendedCost",
"reservation/EffectiveCost",
"reservation/NetEffectiveCost",
"savingsPlan/SavingsPlanEffectiveCost",
"savingsPlan/NetSavingsPlanEffectiveCost",
"lineItem/BlendedCost",
]
RESOURCE_ID_COL = "lineItem/ResourceId"
USAGE_TYPE_COL = "lineItem/UsageType"
USAGE_AMOUNT_COL = "lineItem/UsageAmount"
USAGE_START_COL = "lineItem/UsageStartDate"
PRODUCT_CODE_COL = "lineItem/ProductCode"
USAGE_ACCOUNT_COL = "lineItem/UsageAccountId"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Aggregate AWS CUR cost and usage by resource ID, optionally split by usage type."
)
parser.add_argument(
"input_files",
nargs="+",
help="One or more CUR CSV files",
)
parser.add_argument(
"--mode",
choices=["resource", "resource-usagetype"],
default="resource",
help="Aggregation mode",
)
parser.add_argument(
"--output",
default="output.csv",
help="Output CSV file path",
)
parser.add_argument(
"--cost-column",
default=None,
help=(
"Specific cost column to use. "
"If omitted, the script auto-selects the first existing column from: "
+ ", ".join(DEFAULT_COST_CANDIDATES)
),
)
parser.add_argument(
"--include-empty-resource-id",
action="store_true",
help="Include rows where lineItem/ResourceId is empty",
)
parser.add_argument(
"--usage-account-id",
default=None,
help="Only include rows for this lineItem/UsageAccountId",
)
parser.add_argument(
"--product-code",
default=None,
help="Only include rows for this lineItem/ProductCode, e.g. AmazonEC2",
)
parser.add_argument(
"--start-date",
default=None,
help="Only include rows with lineItem/UsageStartDate >= this value (YYYY-MM-DD or ISO timestamp)",
)
parser.add_argument(
"--end-date",
default=None,
help="Only include rows with lineItem/UsageStartDate < this value (YYYY-MM-DD or ISO timestamp)",
)
return parser.parse_args()
def to_decimal(value: Optional[str]) -> Decimal:
if value is None:
return Decimal("0")
value = value.strip()
if value == "":
return Decimal("0")
try:
return Decimal(value)
except (InvalidOperation, ValueError):
return Decimal("0")
def detect_cost_column(fieldnames: Iterable[str], requested: Optional[str]) -> str:
fieldnames = list(fieldnames)
if requested:
if requested not in fieldnames:
raise ValueError(f"Requested cost column '{requested}' not found in file")
return requested
for col in DEFAULT_COST_CANDIDATES:
if col in fieldnames:
return col
raise ValueError(
"Could not find a usable cost column. "
f"Tried: {', '.join(DEFAULT_COST_CANDIDATES)}"
)
def date_in_range(
usage_start: str,
start_date: Optional[str],
end_date: Optional[str],
) -> bool:
if not usage_start:
return False
# ISO-8601 strings compare correctly lexicographically
if start_date and usage_start < start_date:
return False
if end_date and usage_start >= end_date:
return False
return True
def row_matches_filters(row: dict, args: argparse.Namespace) -> bool:
resource_id = (row.get(RESOURCE_ID_COL) or "").strip()
if not args.include_empty_resource_id and not resource_id:
return False
if args.usage_account_id and (row.get(USAGE_ACCOUNT_COL) or "").strip() != args.usage_account_id:
return False
if args.product_code and (row.get(PRODUCT_CODE_COL) or "").strip() != args.product_code:
return False
if args.start_date or args.end_date:
if not date_in_range(row.get(USAGE_START_COL, ""), args.start_date, args.end_date):
return False
return True
def aggregate_cur(args: argparse.Namespace) -> Tuple[dict, List[str]]:
aggregates: Dict[Tuple[str, ...], Dict[str, Decimal]] = defaultdict(
lambda: {"usage_amount": Decimal("0"), "cost": Decimal("0"), "line_count": Decimal("0")}
)
cost_columns_used: List[str] = []
for input_file in args.input_files:
with open(input_file, "r", newline="", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
if not reader.fieldnames:
raise ValueError(f"Input CSV has no header row: {input_file}")
cost_column = detect_cost_column(reader.fieldnames, args.cost_column)
cost_columns_used.append(cost_column)
for row in reader:
if not row_matches_filters(row, args):
continue
resource_id = (row.get(RESOURCE_ID_COL) or "").strip()
usage_type = (row.get(USAGE_TYPE_COL) or "").strip()
usage_amount = to_decimal(row.get(USAGE_AMOUNT_COL))
cost = to_decimal(row.get(cost_column))
if args.mode == "resource":
key = (resource_id,)
else:
key = (resource_id, usage_type)
aggregates[key]["usage_amount"] += usage_amount
aggregates[key]["cost"] += cost
aggregates[key]["line_count"] += Decimal("1")
return aggregates, cost_columns_used
def write_output(
output_file: str,
mode: str,
aggregates: dict,
cost_columns_used: List[str],
) -> None:
if mode == "resource":
fieldnames = ["resource_id", "total_usage_amount", "total_cost", "line_count", "cost_columns_used"]
else:
fieldnames = [
"resource_id",
"usage_type",
"total_usage_amount",
"total_cost",
"line_count",
"cost_columns_used",
]
sorted_items = sorted(
aggregates.items(),
key=lambda item: (item[1]["cost"], item[1]["usage_amount"]),
reverse=True,
)
unique_cost_columns = ",".join(sorted(set(cost_columns_used)))
with open(output_file, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for key, data in sorted_items:
row = {
"total_usage_amount": str(data["usage_amount"]),
"total_cost": str(data["cost"]),
"line_count": str(int(data["line_count"])),
"cost_columns_used": unique_cost_columns,
}
if mode == "resource":
row["resource_id"] = key[0]
else:
row["resource_id"] = key[0]
row["usage_type"] = key[1]
writer.writerow(row)
def main() -> None:
args = parse_args()
aggregates, cost_columns_used = aggregate_cur(args)
write_output(args.output, args.mode, aggregates, cost_columns_used)
print(f"Done. Wrote {len(aggregates)} aggregated rows to {args.output}")
print(f"Processed files: {len(args.input_files)}")
print(f"Cost columns used: {', '.join(sorted(set(cost_columns_used)))}")
if __name__ == "__main__":
main()
@Burekasim
Copy link
Copy Markdown
Author

Aggregate multiple files by resource ID:
python cur-exporter.py jan.csv feb.csv mar.csv --mode resource --output by_resource.csv
Aggregate multiple files by resource ID + usage type:
python cur-exporter.py jan.csv feb.csv mar.csv --mode resource-usagetype --output by_resource_usagetype.csv

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment