Skip to content

Instantly share code, notes, and snippets.

@polius
Created May 19, 2025 07:54
Show Gist options
  • Save polius/84239e3cf0bf269fc4be09e22a665b91 to your computer and use it in GitHub Desktop.
Save polius/84239e3cf0bf269fc4be09e22a665b91 to your computer and use it in GitHub Desktop.
A python script to extract and aggregate AWS costs by resource from the AWS Cost and Usage Report (CUR).
import os
import re
import gzip
import pandas as pd
from glob import glob
def find_csv_gz_files(root_dir):
"""Recursively find all .csv.gz files in ascending order."""
return sorted(glob(os.path.join(root_dir, '**', '*.csv.gz'), recursive=True))
def read_and_filter_csv_gz(file_path, resources):
"""Read a .csv.gz file and filter rows."""
print(f"Reading file: {file_path}")
with gzip.open(file_path, 'rt') as f:
df = pd.read_csv(f, low_memory=False)
# Convert date columns to datetime
df['lineItem/UsageStartDate'] = pd.to_datetime(df['lineItem/UsageStartDate'], errors='coerce')
df['lineItem/UsageEndDate'] = pd.to_datetime(df['lineItem/UsageEndDate'], errors='coerce')
# Convert the cost to float
df['lineItem/UnblendedCost'] = pd.to_numeric(df['lineItem/UnblendedCost'], errors='coerce')
mask = (
(df['lineItem/UsageStartDate'].dt.hour == 0) & (df['lineItem/UsageStartDate'].dt.minute == 0) & (df['lineItem/UsageStartDate'].dt.second == 0) & # Start time is 00:00:00
(df['lineItem/UsageEndDate'].dt.hour == 0) & (df['lineItem/UsageEndDate'].dt.minute == 0) & (df['lineItem/UsageEndDate'].dt.second == 0) & # End time is 00:00:00
(df['lineItem/UsageEndDate'] == df['lineItem/UsageStartDate'] + pd.Timedelta(days=1)) # End date is start + 1 day
)
# Create a regex pattern to match any of the resource IDs
pattern = '|'.join(re.escape(term) for term in resources)
# Filter the DataFrame based on the regex pattern and the mask
return df[
df['lineItem/ResourceId'].str.contains(pattern, na=False, case=False)
& mask
]
def merge_filtered_csv_gz_files(root_dir, resources):
"""Find, read, filter, and merge .csv.gz files into one .csv file."""
all_files = find_csv_gz_files(root_dir)
print(f"Found {len(all_files)} .csv.gz files.")
filtered_dfs = [read_and_filter_csv_gz(f, resources) for f in all_files]
merged_df = pd.concat(filtered_dfs, ignore_index=True)
distinct_df = merged_df.drop_duplicates(subset=[
'lineItem/UsageStartDate',
'lineItem/UsageEndDate',
'lineItem/ResourceId',
'product/usagetype'
], keep='first')
return distinct_df
def group_and_aggregate(df):
"""Group and sum UnblendedCost by lineItem/ResourceId, product/usagetype and lineItem/LineItemDescription."""
required_cols = [
'lineItem/ResourceId',
'product/usagetype',
'lineItem/UnblendedCost',
'lineItem/LineItemDescription'
]
# Ensure required columns are present
for col in required_cols:
if col not in df.columns:
raise ValueError(f"Missing required column: {col}")
# Group and aggregate
grouped_df = df[required_cols].groupby(
['lineItem/ResourceId', 'product/usagetype', 'lineItem/LineItemDescription'],
as_index=False,
dropna=False
).agg({'lineItem/UnblendedCost': 'sum'})
# Sorting by UnblendedCost in descending order
grouped_df = grouped_df.sort_values(by='lineItem/UnblendedCost', ascending=False)
return grouped_df
if __name__ == "__main__":
# Set the source directory to scan
source_directory = "20250401-20250501"
# Define the resources to filter
resources = ['']
# Merge and filter the CSV files
df = merge_filtered_csv_gz_files(source_directory, resources)
df = group_and_aggregate(df)
# Sum the total unblended costs
total_cost = df['lineItem/UnblendedCost'].sum()
print(f"Total costs: ${total_cost:,.2f}")
# Save the merged DataFrame to a CSV file
df.to_csv(f"{source_directory}.csv", index=False)
print(f"File '{source_directory}.csv' created.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment