Created
March 29, 2026 14:16
-
-
Save Burekasim/fc60489d51b1f756450528b0df784dc8 to your computer and use it in GitHub Desktop.
cur aggregate for parquet -
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| from decimal import Decimal, InvalidOperation | |
| from collections import defaultdict | |
| from typing import Dict, Tuple, Optional, List | |
| import pyarrow.dataset as ds | |
| DEFAULT_COST_CANDIDATES = [ | |
| "lineItem/NetUnblendedCost", | |
| "lineItem/UnblendedCost", | |
| "reservation/EffectiveCost", | |
| "reservation/NetEffectiveCost", | |
| "savingsPlan/SavingsPlanEffectiveCost", | |
| "savingsPlan/NetSavingsPlanEffectiveCost", | |
| "lineItem/BlendedCost", | |
| ] | |
| RESOURCE_ID_COL = "lineItem/ResourceId" | |
| USAGE_TYPE_COL = "lineItem/UsageType" | |
| USAGE_AMOUNT_COL = "lineItem/UsageAmount" | |
| USAGE_START_COL = "lineItem/UsageStartDate" | |
| PRODUCT_CODE_COL = "lineItem/ProductCode" | |
| USAGE_ACCOUNT_COL = "lineItem/UsageAccountId" | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="Aggregate AWS CUR Parquet cost and usage by resource ID, optionally split by usage type." | |
| ) | |
| parser.add_argument( | |
| "input_paths", | |
| nargs="+", | |
| help="One or more Parquet files and/or directories", | |
| ) | |
| parser.add_argument( | |
| "--mode", | |
| choices=["resource", "resource-usagetype"], | |
| default="resource", | |
| help="Aggregation mode", | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| default="output.csv", | |
| help="Output CSV file path", | |
| ) | |
| parser.add_argument( | |
| "--cost-column", | |
| default=None, | |
| help=( | |
| "Specific cost column to use. " | |
| "If omitted, the script auto-selects the first existing column from: " | |
| + ", ".join(DEFAULT_COST_CANDIDATES) | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--include-empty-resource-id", | |
| action="store_true", | |
| help="Include rows where lineItem/ResourceId is empty", | |
| ) | |
| parser.add_argument( | |
| "--usage-account-id", | |
| default=None, | |
| help="Only include rows for this lineItem/UsageAccountId", | |
| ) | |
| parser.add_argument( | |
| "--product-code", | |
| default=None, | |
| help="Only include rows for this lineItem/ProductCode, e.g. AmazonEC2", | |
| ) | |
| parser.add_argument( | |
| "--start-date", | |
| default=None, | |
| help="Only include rows with lineItem/UsageStartDate >= this value (YYYY-MM-DD or ISO timestamp)", | |
| ) | |
| parser.add_argument( | |
| "--end-date", | |
| default=None, | |
| help="Only include rows with lineItem/UsageStartDate < this value (YYYY-MM-DD or ISO timestamp)", | |
| ) | |
| parser.add_argument( | |
| "--batch-size", | |
| type=int, | |
| default=100_000, | |
| help="Number of rows per batch to process from Parquet", | |
| ) | |
| return parser.parse_args() | |
| def to_decimal(value: Optional[object]) -> Decimal: | |
| if value is None: | |
| return Decimal("0") | |
| text = str(value).strip() | |
| if text == "" or text.lower() == "nan": | |
| return Decimal("0") | |
| try: | |
| return Decimal(text) | |
| except (InvalidOperation, ValueError): | |
| return Decimal("0") | |
| def detect_cost_column(schema_names: List[str], requested: Optional[str]) -> str: | |
| if requested: | |
| if requested not in schema_names: | |
| raise ValueError(f"Requested cost column '{requested}' not found in dataset") | |
| return requested | |
| for col in DEFAULT_COST_CANDIDATES: | |
| if col in schema_names: | |
| return col | |
| raise ValueError( | |
| "Could not find a usable cost column. " | |
| f"Tried: {', '.join(DEFAULT_COST_CANDIDATES)}" | |
| ) | |
| def build_filter(args: argparse.Namespace): | |
| expr = None | |
| if args.usage_account_id: | |
| cond = ds.field(USAGE_ACCOUNT_COL) == args.usage_account_id | |
| expr = cond if expr is None else expr & cond | |
| if args.product_code: | |
| cond = ds.field(PRODUCT_CODE_COL) == args.product_code | |
| expr = cond if expr is None else expr & cond | |
| if args.start_date: | |
| cond = ds.field(USAGE_START_COL) >= args.start_date | |
| expr = cond if expr is None else expr & cond | |
| if args.end_date: | |
| cond = ds.field(USAGE_START_COL) < args.end_date | |
| expr = cond if expr is None else expr & cond | |
| if not args.include_empty_resource_id: | |
| cond = ds.field(RESOURCE_ID_COL) != "" | |
| expr = cond if expr is None else expr & cond | |
| return expr | |
| def make_dataset(paths: List[str]) -> ds.Dataset: | |
| # Parquet supports reading from a single file, a list of files, or directories. | |
| return ds.dataset(paths, format="parquet") | |
| def aggregate_parquet(args: argparse.Namespace): | |
| dataset = make_dataset(args.input_paths) | |
| schema_names = dataset.schema.names | |
| cost_column = detect_cost_column(schema_names, args.cost_column) | |
| required_columns = [ | |
| RESOURCE_ID_COL, | |
| USAGE_TYPE_COL, | |
| USAGE_AMOUNT_COL, | |
| USAGE_START_COL, | |
| PRODUCT_CODE_COL, | |
| USAGE_ACCOUNT_COL, | |
| cost_column, | |
| ] | |
| missing = [c for c in required_columns if c not in schema_names] | |
| if missing: | |
| raise ValueError(f"Missing required columns in dataset: {', '.join(missing)}") | |
| filter_expr = build_filter(args) | |
| aggregates: Dict[Tuple[str, ...], Dict[str, Decimal]] = defaultdict( | |
| lambda: {"usage_amount": Decimal("0"), "cost": Decimal("0"), "line_count": Decimal("0")} | |
| ) | |
| scanner = dataset.scanner( | |
| columns=required_columns, | |
| filter=filter_expr, | |
| batch_size=args.batch_size, | |
| use_threads=True, | |
| ) | |
| for record_batch in scanner.to_batches(): | |
| batch = record_batch.to_pydict() | |
| resource_ids = batch.get(RESOURCE_ID_COL, []) | |
| usage_types = batch.get(USAGE_TYPE_COL, []) | |
| usage_amounts = batch.get(USAGE_AMOUNT_COL, []) | |
| costs = batch.get(cost_column, []) | |
| for resource_id, usage_type, usage_amount, cost in zip( | |
| resource_ids, usage_types, usage_amounts, costs | |
| ): | |
| resource_id = "" if resource_id is None else str(resource_id).strip() | |
| usage_type = "" if usage_type is None else str(usage_type).strip() | |
| if not args.include_empty_resource_id and not resource_id: | |
| continue | |
| usage_amount_dec = to_decimal(usage_amount) | |
| cost_dec = to_decimal(cost) | |
| if args.mode == "resource": | |
| key = (resource_id,) | |
| else: | |
| key = (resource_id, usage_type) | |
| aggregates[key]["usage_amount"] += usage_amount_dec | |
| aggregates[key]["cost"] += cost_dec | |
| aggregates[key]["line_count"] += Decimal("1") | |
| return aggregates, cost_column | |
| def write_output(output_file: str, mode: str, aggregates: dict, cost_column: str) -> None: | |
| import csv | |
| if mode == "resource": | |
| fieldnames = [ | |
| "resource_id", | |
| "total_usage_amount", | |
| "total_cost", | |
| "line_count", | |
| "cost_column_used", | |
| ] | |
| else: | |
| fieldnames = [ | |
| "resource_id", | |
| "usage_type", | |
| "total_usage_amount", | |
| "total_cost", | |
| "line_count", | |
| "cost_column_used", | |
| ] | |
| # Most expensive first, then by usage amount descending | |
| sorted_items = sorted( | |
| aggregates.items(), | |
| key=lambda item: (item[1]["cost"], item[1]["usage_amount"]), | |
| reverse=True, | |
| ) | |
| with open(output_file, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for key, data in sorted_items: | |
| row = { | |
| "total_usage_amount": str(data["usage_amount"]), | |
| "total_cost": str(data["cost"]), | |
| "line_count": str(int(data["line_count"])), | |
| "cost_column_used": cost_column, | |
| } | |
| if mode == "resource": | |
| row["resource_id"] = key[0] | |
| else: | |
| row["resource_id"] = key[0] | |
| row["usage_type"] = key[1] | |
| writer.writerow(row) | |
| def main() -> None: | |
| args = parse_args() | |
| aggregates, cost_column = aggregate_parquet(args) | |
| write_output(args.output, args.mode, aggregates, cost_column) | |
| print(f"Done. Wrote {len(aggregates)} aggregated rows to {args.output}") | |
| print(f"Processed input paths: {len(args.input_paths)}") | |
| print(f"Cost column used: {cost_column}") | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
resource id:
python cur_aggregate_parquet.py report1.parquet report2.parquet --mode resource --output by_resource.csvresource id with usage type:
python cur_aggregate_parquet.py ./cur-parquet/ --mode resource-usagetype --output by_resource_usage_type.csvYou cn specify the folder path, use . or *parquet.