Skip to content

Instantly share code, notes, and snippets.

@huevos-y-bacon
Created June 9, 2025 17:28
Show Gist options
  • Save huevos-y-bacon/45c4d0b5e2f470c54b9afb154f456764 to your computer and use it in GitHub Desktop.
Save huevos-y-bacon/45c4d0b5e2f470c54b9afb154f456764 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import boto3
import json
import csv
from datetime import datetime, timedelta
import argparse
import sys
def get_cost_and_usage_paginated(days=30, output_file=None, csv_output=None):
"""
Get AWS Cost and Usage data with pagination support
Args:
days (int): Number of days to look back
output_file (str): JSON output filename (optional)
csv_output (str): CSV output filename (optional)
"""
# Calculate date range
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
start_str = start_date.strftime('%Y-%m-%d')
end_str = end_date.strftime('%Y-%m-%d')
print(f"Fetching cost data from {start_str} to {end_str}")
# Initialize boto3 clients
try:
ce_client = boto3.client('ce')
orgs_client = boto3.client('organizations')
except Exception as e:
print(f"Error initializing AWS client: {e}")
print("Make sure your AWS credentials are configured properly.")
sys.exit(1)
# Get account aliases and current account info
print("Fetching account information...")
account_aliases = get_account_aliases(orgs_client)
current_account_info = get_current_account_info()
# Prepare the request parameters
request_params = {
'TimePeriod': {
'Start': start_str,
'End': end_str
},
'Granularity': 'DAILY',
'Metrics': ['BlendedCost'],
'GroupBy': [
{
'Type': 'DIMENSION',
'Key': 'LINKED_ACCOUNT'
},
{
'Type': 'DIMENSION',
'Key': 'SERVICE'
}
]
}
# Collect all results
all_results = {
'ResultsByTime': [],
'GroupDefinitions': [],
'DimensionKey': None
}
next_page_token = None
page_count = 0
try:
while True:
page_count += 1
print(f"Fetching page {page_count}...")
# Add pagination token if we have one
if next_page_token:
request_params['NextPageToken'] = next_page_token
# Make the API call
response = ce_client.get_cost_and_usage(**request_params)
# Merge results
all_results['ResultsByTime'].extend(response.get('ResultsByTime', []))
# Set metadata from first response
if page_count == 1:
all_results['GroupDefinitions'] = response.get('GroupDefinitions', [])
all_results['DimensionKey'] = response.get('DimensionKey')
# Check for next page
next_page_token = response.get('NextPageToken')
if not next_page_token:
break
# Remove the token from params for next iteration
if 'NextPageToken' in request_params:
del request_params['NextPageToken']
except Exception as e:
print(f"Error fetching cost data: {e}")
sys.exit(1)
print(f"Successfully fetched {len(all_results['ResultsByTime'])} time periods across {page_count} pages")
# Generate output filenames if not provided
if not output_file:
output_file = f"aws-detailed-usage-{days}-{end_str}-{current_account_info}.json"
if not csv_output:
csv_output = f"aws-detailed-usage-{days}-{end_str}-{current_account_info}.csv"
# Write results to JSON file
try:
with open(output_file, 'w') as f:
json.dump(all_results, f, indent=2, default=str)
print(f"JSON results written to {output_file}")
except Exception as e:
print(f"Error writing JSON output file: {e}")
sys.exit(1)
# Write results to CSV file
try:
write_csv_output(all_results, csv_output, account_aliases)
print(f"CSV results written to {csv_output}")
except Exception as e:
print(f"Error writing CSV output file: {e}")
sys.exit(1)
# Print summary statistics
print_summary(all_results, account_aliases)
def get_current_account_info():
"""
Get current account ID for filename
Returns:
str: Account ID
"""
try:
# Get current account ID using STS
sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity()['Account']
return account_id
except Exception as e:
print(f"Warning: Could not get current account info: {e}")
return "unknown-account"
def get_account_aliases(orgs_client):
"""
Get account aliases/names from AWS Organizations
Args:
orgs_client: boto3 organizations client
Returns:
dict: Mapping of account ID to account name/alias
"""
account_aliases = {}
try:
# Try to get account info from Organizations
paginator = orgs_client.get_paginator('list_accounts')
for page in paginator.paginate():
for account in page['Accounts']:
account_id = account['Id']
account_name = account.get('Name', account_id)
account_aliases[account_id] = account_name
print(f"Found {len(account_aliases)} accounts in organization")
except Exception as e:
print(f"Warning: Could not fetch account aliases from Organizations: {e}")
print("Account aliases will show as account IDs")
# Return empty dict - we'll fall back to account IDs
return account_aliases
def write_csv_output(results, csv_filename, account_aliases=None):
"""
Write cost data to CSV file with one row per activity
Args:
results (dict): The cost and usage results
csv_filename (str): Output CSV filename
account_aliases (dict): Mapping of account ID to account name
"""
if account_aliases is None:
account_aliases = {}
fieldnames = [
'Date',
'AccountId',
'AccountAlias',
'Service',
'UsageType',
'Cost',
'Unit',
'StartDate',
'EndDate'
]
with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for time_period in results['ResultsByTime']:
start_date = time_period['TimePeriod']['Start']
end_date = time_period['TimePeriod']['End']
for group in time_period.get('Groups', []):
# Extract account ID and service from the keys (only 2 GroupBy allowed)
account_id = group['Keys'][0] if len(group['Keys']) > 0 else 'Unknown'
service = group['Keys'][1] if len(group['Keys']) > 1 else 'Unknown'
usage_type = 'N/A' # Not available with this GroupBy configuration
# Get account alias
account_alias = account_aliases.get(account_id, account_id)
# Get cost information
cost_amount = group['Metrics']['BlendedCost']['Amount']
cost_unit = group['Metrics']['BlendedCost']['Unit']
# Only write rows with non-zero costs (optional - remove this condition to include zero costs)
if float(cost_amount) > 0:
writer.writerow({
'Date': start_date,
'AccountId': account_id,
'AccountAlias': account_alias,
'Service': service,
'UsageType': usage_type,
'Cost': cost_amount,
'Unit': cost_unit,
'StartDate': start_date,
'EndDate': end_date
})
def print_summary(results, account_aliases=None):
"""Print a summary of the cost data"""
if account_aliases is None:
account_aliases = {}
total_cost = 0
account_costs = {}
account_service_costs = {}
for time_period in results['ResultsByTime']:
for group in time_period.get('Groups', []):
cost = float(group['Metrics']['BlendedCost']['Amount'])
total_cost += cost
# Extract account ID and service (based on current GroupBy order)
if group['Keys']:
account_id = group['Keys'][0]
service = group['Keys'][1] if len(group['Keys']) > 1 else 'Unknown'
# Track costs by account
account_costs[account_id] = account_costs.get(account_id, 0) + cost
# Track costs by account and service
if account_id not in account_service_costs:
account_service_costs[account_id] = {}
account_service_costs[account_id][service] = account_service_costs[account_id].get(service, 0) + cost
print(f"\n--- Summary ---")
print(f"Total cost: ${total_cost:.2f}")
print(f"Number of time periods: {len(results['ResultsByTime'])}")
if account_costs:
print(f"\nCosts by Account:")
sorted_accounts = sorted(account_costs.items(), key=lambda x: x[1], reverse=True)
for account_id, account_cost in sorted_accounts:
account_alias = account_aliases.get(account_id, account_id)
account_display = f"{account_alias} ({account_id})" if account_alias != account_id else account_id
print(f"\n {account_display}: ${account_cost:.2f}")
# Show top services for this account
if account_id in account_service_costs:
sorted_services = sorted(account_service_costs[account_id].items(), key=lambda x: x[1], reverse=True)
for service, service_cost in sorted_services[:5]: # Top 5 services per account
print(f" └─ {service}: ${service_cost:.2f}")
def main():
parser = argparse.ArgumentParser(description='Fetch AWS Cost and Usage data with pagination')
parser.add_argument(
'--days', type=int, default=30, help='Number of days to look back (default: 30)')
parser.add_argument(
'--json-output', type=str, help='JSON output filename (default: aws-detailed-usage-{days}-{date}.json)')
parser.add_argument(
'--csv-output', type=str, help='CSV output filename (default: aws-detailed-usage-{days}-{date}.csv)')
args = parser.parse_args()
get_cost_and_usage_paginated(
days=args.days,
output_file=args.json_output,
csv_output=args.csv_output
)
if __name__ == '__main__':
main()
#!/usr/bin/env bash
set -euo pipefail
# Default values
DAYS=30
JSON_OUTPUT=""
CSV_OUTPUT=""
VERBOSE=false
# Function to show usage
usage() {
cat << EOF
Usage: $0 [OPTIONS]
Fetch AWS Cost and Usage data using AWS CLI
OPTIONS:
--days NUMBER Number of days to look back (default: 30)
--json-output FILE JSON output filename (default: auto-generated)
--csv-output FILE CSV output filename (default: auto-generated)
--verbose Enable verbose output
--help Show this help message
Examples:
$0 # Basic usage (30 days)
$0 --days 7 # Last week
$0 --days 90 --csv-output costs.csv # Custom time and filename
EOF
}
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case $1 in
--days)
DAYS="$2"
shift 2
;;
--json-output)
JSON_OUTPUT="$2"
shift 2
;;
--csv-output)
CSV_OUTPUT="$2"
shift 2
;;
--verbose)
VERBOSE=true
shift
;;
--help)
usage
exit 0
;;
*)
echo "Error: Unknown option $1"
usage
exit 1
;;
esac
done
# Function to log messages
log() {
if [[ "$VERBOSE" == "true" ]]; then
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" >&2
fi
}
# Function to get current account info
get_current_account_info() {
local account_id
# Get account ID
account_id=$(aws sts get-caller-identity --query 'Account' --output text 2>/dev/null || echo "unknown")
echo "$account_id"
}
# Function to get account aliases from Organizations
get_account_aliases() {
local temp_file="$1"
log "Fetching account information from Organizations..."
if aws organizations list-accounts --output json > "$temp_file" 2>/dev/null; then
local account_count
account_count=$(jq '.Accounts | length' "$temp_file")
log "Found $account_count accounts in organization"
return 0
else
log "Warning: Could not fetch account aliases from Organizations"
echo '{"Accounts": []}' > "$temp_file"
return 0
fi
}
# Function to convert JSON to CSV
json_to_csv() {
local json_file="$1"
local csv_file="$2"
local accounts_file="$3"
log "Converting JSON to CSV..."
# Create CSV header
echo "Date,AccountId,AccountAlias,Service,UsageType,Cost,Unit,StartDate,EndDate" > "$csv_file"
# Process each time period
jq -r --slurpfile accounts "$accounts_file" '
.ResultsByTime[] as $period |
$period.TimePeriod.Start as $start_date |
$period.TimePeriod.End as $end_date |
$period.Groups[] as $group |
# Extract account ID and service from keys
($group.Keys[0] // "Unknown") as $account_id |
($group.Keys[1] // "Unknown") as $service |
# Get account alias from organizations data (fallback to account_id if not found)
(if ($accounts[0].Accounts | length) > 0 then
($accounts[0].Accounts[] | select(.Id == $account_id) | .Name // $account_id)
else
$account_id
end) as $account_alias |
# Get cost information
$group.Metrics.BlendedCost.Amount as $cost_amount |
$group.Metrics.BlendedCost.Unit as $cost_unit |
# Only output non-zero costs
if ($cost_amount | tonumber) > 0 then
[
$start_date,
$account_id,
$account_alias,
$service,
"N/A", # UsageType not available with this GroupBy
$cost_amount,
$cost_unit,
$start_date,
$end_date
] | @csv
else
empty
end
' "$json_file" >> "$csv_file"
}
# Function to print summary
print_summary() {
local json_file="$1"
local accounts_file="$2"
echo
echo "--- Summary ---"
# Calculate total cost
local total_cost
total_cost=$(jq -r '[.ResultsByTime[].Groups[].Metrics.BlendedCost.Amount | tonumber] | add' "$json_file")
if [[ "$total_cost" == "null" ]] || [[ -z "$total_cost" ]]; then
total_cost="0"
fi
printf "Total cost: \$%.2f\n" "$total_cost"
# Get number of time periods
local time_periods
time_periods=$(jq '.ResultsByTime | length' "$json_file")
echo "Number of time periods: ${time_periods}"
echo
echo "Costs by Account:"
# Use a simpler approach - process the data directly without temp files
local account_summary
account_summary=$(jq -r --slurpfile accounts "$accounts_file" '
# Group costs by account
reduce .ResultsByTime[].Groups[] as $group ({};
. + {($group.Keys[0]): ((.[$group.Keys[0]] // 0) + ($group.Metrics.BlendedCost.Amount | tonumber))}) |
# Sort by value and format output
to_entries | sort_by(.value) | reverse[] |
.key as $account_id |
.value as $account_cost |
(if ($accounts[0].Accounts | length) > 0 then
($accounts[0].Accounts[] | select(.Id == $account_id) | .Name // $account_id)
else
$account_id
end) as $account_alias |
# Format account line
["", (if $account_alias != $account_id then " " + $account_alias + " (" + $account_id + ")" else " " + $account_id end) + ": $" + (($account_cost * 100 | round) / 100 | tostring)] | .[]
' "$json_file")
echo "$account_summary"
# Now get service breakdown for each account
jq -r --slurpfile accounts "$accounts_file" '
# Group costs by account and service
reduce .ResultsByTime[].Groups[] as $group ({};
.[$group.Keys[0]][$group.Keys[1]] = ((.[$group.Keys[0]][$group.Keys[1]] // 0) + ($group.Metrics.BlendedCost.Amount | tonumber))) |
# Process each account
to_entries[] |
.key as $account_id |
.value | to_entries | sort_by(.value) | reverse | .[0:5][] |
" └─ " + .key + ": $" + ((.value * 100 | round) / 100 | tostring)
' "$json_file"
}
# Main execution
main() {
# Calculate date range
local end_date start_date
if command -v gdate > /dev/null 2>&1; then
# Use GNU date (available via brew install coreutils on macOS)
end_date=$(gdate +%Y-%m-%d)
start_date=$(gdate -d "-${DAYS} days" +%Y-%m-%d)
elif date -v1d > /dev/null 2>&1; then
# macOS/BSD date
end_date=$(date +%Y-%m-%d)
start_date=$(date -v-${DAYS}d +%Y-%m-%d)
else
# Linux/GNU date
end_date=$(date +%Y-%m-%d)
start_date=$(date -d "-${DAYS} days" +%Y-%m-%d)
fi
echo "Fetching cost data from $start_date to $end_date"
# Get current account info for filenames
local current_account_info
current_account_info=$(get_current_account_info)
# Set default output filenames if not provided
if [[ -z "$JSON_OUTPUT" ]]; then
JSON_OUTPUT="aws-detailed-usage-${DAYS}-${end_date}-${current_account_info}.json"
fi
if [[ -z "$CSV_OUTPUT" ]]; then
CSV_OUTPUT="aws-detailed-usage-${DAYS}-${end_date}-${current_account_info}.csv"
fi
# Create temporary files
local temp_dir
temp_dir=$(mktemp -d)
local accounts_file="${temp_dir}/accounts.json"
local raw_output="${temp_dir}/raw_output.json"
# Cleanup function
cleanup() {
if [[ -n "${temp_dir:-}" ]] && [[ -d "$temp_dir" ]]; then
rm -rf "$temp_dir"
fi
}
trap cleanup EXIT
# Get account information
get_account_aliases "$accounts_file"
log "Making Cost Explorer API call..."
# Make the AWS Cost Explorer API call
if ! aws ce get-cost-and-usage \
--time-period Start="$start_date",End="$end_date" \
--granularity DAILY \
--metrics "BlendedCost" \
--group-by Type=DIMENSION,Key=LINKED_ACCOUNT Type=DIMENSION,Key=SERVICE \
--output json > "$raw_output"; then
echo "Error: Failed to fetch cost data from AWS Cost Explorer" >&2
exit 1
fi
# Copy raw output to final JSON file
cp "$raw_output" "$JSON_OUTPUT"
echo "JSON results written to $JSON_OUTPUT"
# Convert to CSV
json_to_csv "$raw_output" "$CSV_OUTPUT" "$accounts_file"
echo "CSV results written to $CSV_OUTPUT"
# Print summary
print_summary "$raw_output" "$accounts_file"
log "Script completed successfully"
}
# Check dependencies
check_dependencies() {
local missing_deps=()
if ! command -v aws > /dev/null 2>&1; then
missing_deps+=("aws-cli")
fi
if ! command -v jq > /dev/null 2>&1; then
missing_deps+=("jq")
fi
if [[ ${#missing_deps[@]} -gt 0 ]]; then
echo "Error: Missing required dependencies: ${missing_deps[*]}" >&2
echo "Please install them and try again." >&2
exit 1
fi
}
# Run dependency check and main function
check_dependencies
main "$@"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment