Skip to content

Instantly share code, notes, and snippets.

@Burekasim
Created March 29, 2026 14:16
Show Gist options
  • Select an option

  • Save Burekasim/fc60489d51b1f756450528b0df784dc8 to your computer and use it in GitHub Desktop.

Select an option

Save Burekasim/fc60489d51b1f756450528b0df784dc8 to your computer and use it in GitHub Desktop.
cur aggregate for parquet -
#!/usr/bin/env python3
import argparse
from decimal import Decimal, InvalidOperation
from collections import defaultdict
from typing import Dict, Tuple, Optional, List
import pyarrow.dataset as ds
DEFAULT_COST_CANDIDATES = [
"lineItem/NetUnblendedCost",
"lineItem/UnblendedCost",
"reservation/EffectiveCost",
"reservation/NetEffectiveCost",
"savingsPlan/SavingsPlanEffectiveCost",
"savingsPlan/NetSavingsPlanEffectiveCost",
"lineItem/BlendedCost",
]
RESOURCE_ID_COL = "lineItem/ResourceId"
USAGE_TYPE_COL = "lineItem/UsageType"
USAGE_AMOUNT_COL = "lineItem/UsageAmount"
USAGE_START_COL = "lineItem/UsageStartDate"
PRODUCT_CODE_COL = "lineItem/ProductCode"
USAGE_ACCOUNT_COL = "lineItem/UsageAccountId"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Aggregate AWS CUR Parquet cost and usage by resource ID, optionally split by usage type."
)
parser.add_argument(
"input_paths",
nargs="+",
help="One or more Parquet files and/or directories",
)
parser.add_argument(
"--mode",
choices=["resource", "resource-usagetype"],
default="resource",
help="Aggregation mode",
)
parser.add_argument(
"--output",
default="output.csv",
help="Output CSV file path",
)
parser.add_argument(
"--cost-column",
default=None,
help=(
"Specific cost column to use. "
"If omitted, the script auto-selects the first existing column from: "
+ ", ".join(DEFAULT_COST_CANDIDATES)
),
)
parser.add_argument(
"--include-empty-resource-id",
action="store_true",
help="Include rows where lineItem/ResourceId is empty",
)
parser.add_argument(
"--usage-account-id",
default=None,
help="Only include rows for this lineItem/UsageAccountId",
)
parser.add_argument(
"--product-code",
default=None,
help="Only include rows for this lineItem/ProductCode, e.g. AmazonEC2",
)
parser.add_argument(
"--start-date",
default=None,
help="Only include rows with lineItem/UsageStartDate >= this value (YYYY-MM-DD or ISO timestamp)",
)
parser.add_argument(
"--end-date",
default=None,
help="Only include rows with lineItem/UsageStartDate < this value (YYYY-MM-DD or ISO timestamp)",
)
parser.add_argument(
"--batch-size",
type=int,
default=100_000,
help="Number of rows per batch to process from Parquet",
)
return parser.parse_args()
def to_decimal(value: Optional[object]) -> Decimal:
if value is None:
return Decimal("0")
text = str(value).strip()
if text == "" or text.lower() == "nan":
return Decimal("0")
try:
return Decimal(text)
except (InvalidOperation, ValueError):
return Decimal("0")
def detect_cost_column(schema_names: List[str], requested: Optional[str]) -> str:
if requested:
if requested not in schema_names:
raise ValueError(f"Requested cost column '{requested}' not found in dataset")
return requested
for col in DEFAULT_COST_CANDIDATES:
if col in schema_names:
return col
raise ValueError(
"Could not find a usable cost column. "
f"Tried: {', '.join(DEFAULT_COST_CANDIDATES)}"
)
def build_filter(args: argparse.Namespace):
expr = None
if args.usage_account_id:
cond = ds.field(USAGE_ACCOUNT_COL) == args.usage_account_id
expr = cond if expr is None else expr & cond
if args.product_code:
cond = ds.field(PRODUCT_CODE_COL) == args.product_code
expr = cond if expr is None else expr & cond
if args.start_date:
cond = ds.field(USAGE_START_COL) >= args.start_date
expr = cond if expr is None else expr & cond
if args.end_date:
cond = ds.field(USAGE_START_COL) < args.end_date
expr = cond if expr is None else expr & cond
if not args.include_empty_resource_id:
cond = ds.field(RESOURCE_ID_COL) != ""
expr = cond if expr is None else expr & cond
return expr
def make_dataset(paths: List[str]) -> ds.Dataset:
# Parquet supports reading from a single file, a list of files, or directories.
return ds.dataset(paths, format="parquet")
def aggregate_parquet(args: argparse.Namespace):
dataset = make_dataset(args.input_paths)
schema_names = dataset.schema.names
cost_column = detect_cost_column(schema_names, args.cost_column)
required_columns = [
RESOURCE_ID_COL,
USAGE_TYPE_COL,
USAGE_AMOUNT_COL,
USAGE_START_COL,
PRODUCT_CODE_COL,
USAGE_ACCOUNT_COL,
cost_column,
]
missing = [c for c in required_columns if c not in schema_names]
if missing:
raise ValueError(f"Missing required columns in dataset: {', '.join(missing)}")
filter_expr = build_filter(args)
aggregates: Dict[Tuple[str, ...], Dict[str, Decimal]] = defaultdict(
lambda: {"usage_amount": Decimal("0"), "cost": Decimal("0"), "line_count": Decimal("0")}
)
scanner = dataset.scanner(
columns=required_columns,
filter=filter_expr,
batch_size=args.batch_size,
use_threads=True,
)
for record_batch in scanner.to_batches():
batch = record_batch.to_pydict()
resource_ids = batch.get(RESOURCE_ID_COL, [])
usage_types = batch.get(USAGE_TYPE_COL, [])
usage_amounts = batch.get(USAGE_AMOUNT_COL, [])
costs = batch.get(cost_column, [])
for resource_id, usage_type, usage_amount, cost in zip(
resource_ids, usage_types, usage_amounts, costs
):
resource_id = "" if resource_id is None else str(resource_id).strip()
usage_type = "" if usage_type is None else str(usage_type).strip()
if not args.include_empty_resource_id and not resource_id:
continue
usage_amount_dec = to_decimal(usage_amount)
cost_dec = to_decimal(cost)
if args.mode == "resource":
key = (resource_id,)
else:
key = (resource_id, usage_type)
aggregates[key]["usage_amount"] += usage_amount_dec
aggregates[key]["cost"] += cost_dec
aggregates[key]["line_count"] += Decimal("1")
return aggregates, cost_column
def write_output(output_file: str, mode: str, aggregates: dict, cost_column: str) -> None:
import csv
if mode == "resource":
fieldnames = [
"resource_id",
"total_usage_amount",
"total_cost",
"line_count",
"cost_column_used",
]
else:
fieldnames = [
"resource_id",
"usage_type",
"total_usage_amount",
"total_cost",
"line_count",
"cost_column_used",
]
# Most expensive first, then by usage amount descending
sorted_items = sorted(
aggregates.items(),
key=lambda item: (item[1]["cost"], item[1]["usage_amount"]),
reverse=True,
)
with open(output_file, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for key, data in sorted_items:
row = {
"total_usage_amount": str(data["usage_amount"]),
"total_cost": str(data["cost"]),
"line_count": str(int(data["line_count"])),
"cost_column_used": cost_column,
}
if mode == "resource":
row["resource_id"] = key[0]
else:
row["resource_id"] = key[0]
row["usage_type"] = key[1]
writer.writerow(row)
def main() -> None:
args = parse_args()
aggregates, cost_column = aggregate_parquet(args)
write_output(args.output, args.mode, aggregates, cost_column)
print(f"Done. Wrote {len(aggregates)} aggregated rows to {args.output}")
print(f"Processed input paths: {len(args.input_paths)}")
print(f"Cost column used: {cost_column}")
if __name__ == "__main__":
main()
@Burekasim
Copy link
Copy Markdown
Author

resource id:
python cur_aggregate_parquet.py report1.parquet report2.parquet --mode resource --output by_resource.csv
resource id with usage type:
python cur_aggregate_parquet.py ./cur-parquet/ --mode resource-usagetype --output by_resource_usage_type.csv

You cn specify the folder path, use . or *parquet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment