Created
June 9, 2025 17:28
-
-
Save huevos-y-bacon/45c4d0b5e2f470c54b9afb154f456764 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import boto3 | |
| import json | |
| import csv | |
| from datetime import datetime, timedelta | |
| import argparse | |
| import sys | |
| def get_cost_and_usage_paginated(days=30, output_file=None, csv_output=None): | |
| """ | |
| Get AWS Cost and Usage data with pagination support | |
| Args: | |
| days (int): Number of days to look back | |
| output_file (str): JSON output filename (optional) | |
| csv_output (str): CSV output filename (optional) | |
| """ | |
| # Calculate date range | |
| end_date = datetime.now().date() | |
| start_date = end_date - timedelta(days=days) | |
| start_str = start_date.strftime('%Y-%m-%d') | |
| end_str = end_date.strftime('%Y-%m-%d') | |
| print(f"Fetching cost data from {start_str} to {end_str}") | |
| # Initialize boto3 clients | |
| try: | |
| ce_client = boto3.client('ce') | |
| orgs_client = boto3.client('organizations') | |
| except Exception as e: | |
| print(f"Error initializing AWS client: {e}") | |
| print("Make sure your AWS credentials are configured properly.") | |
| sys.exit(1) | |
| # Get account aliases and current account info | |
| print("Fetching account information...") | |
| account_aliases = get_account_aliases(orgs_client) | |
| current_account_info = get_current_account_info() | |
| # Prepare the request parameters | |
| request_params = { | |
| 'TimePeriod': { | |
| 'Start': start_str, | |
| 'End': end_str | |
| }, | |
| 'Granularity': 'DAILY', | |
| 'Metrics': ['BlendedCost'], | |
| 'GroupBy': [ | |
| { | |
| 'Type': 'DIMENSION', | |
| 'Key': 'LINKED_ACCOUNT' | |
| }, | |
| { | |
| 'Type': 'DIMENSION', | |
| 'Key': 'SERVICE' | |
| } | |
| ] | |
| } | |
| # Collect all results | |
| all_results = { | |
| 'ResultsByTime': [], | |
| 'GroupDefinitions': [], | |
| 'DimensionKey': None | |
| } | |
| next_page_token = None | |
| page_count = 0 | |
| try: | |
| while True: | |
| page_count += 1 | |
| print(f"Fetching page {page_count}...") | |
| # Add pagination token if we have one | |
| if next_page_token: | |
| request_params['NextPageToken'] = next_page_token | |
| # Make the API call | |
| response = ce_client.get_cost_and_usage(**request_params) | |
| # Merge results | |
| all_results['ResultsByTime'].extend(response.get('ResultsByTime', [])) | |
| # Set metadata from first response | |
| if page_count == 1: | |
| all_results['GroupDefinitions'] = response.get('GroupDefinitions', []) | |
| all_results['DimensionKey'] = response.get('DimensionKey') | |
| # Check for next page | |
| next_page_token = response.get('NextPageToken') | |
| if not next_page_token: | |
| break | |
| # Remove the token from params for next iteration | |
| if 'NextPageToken' in request_params: | |
| del request_params['NextPageToken'] | |
| except Exception as e: | |
| print(f"Error fetching cost data: {e}") | |
| sys.exit(1) | |
| print(f"Successfully fetched {len(all_results['ResultsByTime'])} time periods across {page_count} pages") | |
| # Generate output filenames if not provided | |
| if not output_file: | |
| output_file = f"aws-detailed-usage-{days}-{end_str}-{current_account_info}.json" | |
| if not csv_output: | |
| csv_output = f"aws-detailed-usage-{days}-{end_str}-{current_account_info}.csv" | |
| # Write results to JSON file | |
| try: | |
| with open(output_file, 'w') as f: | |
| json.dump(all_results, f, indent=2, default=str) | |
| print(f"JSON results written to {output_file}") | |
| except Exception as e: | |
| print(f"Error writing JSON output file: {e}") | |
| sys.exit(1) | |
| # Write results to CSV file | |
| try: | |
| write_csv_output(all_results, csv_output, account_aliases) | |
| print(f"CSV results written to {csv_output}") | |
| except Exception as e: | |
| print(f"Error writing CSV output file: {e}") | |
| sys.exit(1) | |
| # Print summary statistics | |
| print_summary(all_results, account_aliases) | |
| def get_current_account_info(): | |
| """ | |
| Get current account ID for filename | |
| Returns: | |
| str: Account ID | |
| """ | |
| try: | |
| # Get current account ID using STS | |
| sts_client = boto3.client('sts') | |
| account_id = sts_client.get_caller_identity()['Account'] | |
| return account_id | |
| except Exception as e: | |
| print(f"Warning: Could not get current account info: {e}") | |
| return "unknown-account" | |
| def get_account_aliases(orgs_client): | |
| """ | |
| Get account aliases/names from AWS Organizations | |
| Args: | |
| orgs_client: boto3 organizations client | |
| Returns: | |
| dict: Mapping of account ID to account name/alias | |
| """ | |
| account_aliases = {} | |
| try: | |
| # Try to get account info from Organizations | |
| paginator = orgs_client.get_paginator('list_accounts') | |
| for page in paginator.paginate(): | |
| for account in page['Accounts']: | |
| account_id = account['Id'] | |
| account_name = account.get('Name', account_id) | |
| account_aliases[account_id] = account_name | |
| print(f"Found {len(account_aliases)} accounts in organization") | |
| except Exception as e: | |
| print(f"Warning: Could not fetch account aliases from Organizations: {e}") | |
| print("Account aliases will show as account IDs") | |
| # Return empty dict - we'll fall back to account IDs | |
| return account_aliases | |
| def write_csv_output(results, csv_filename, account_aliases=None): | |
| """ | |
| Write cost data to CSV file with one row per activity | |
| Args: | |
| results (dict): The cost and usage results | |
| csv_filename (str): Output CSV filename | |
| account_aliases (dict): Mapping of account ID to account name | |
| """ | |
| if account_aliases is None: | |
| account_aliases = {} | |
| fieldnames = [ | |
| 'Date', | |
| 'AccountId', | |
| 'AccountAlias', | |
| 'Service', | |
| 'UsageType', | |
| 'Cost', | |
| 'Unit', | |
| 'StartDate', | |
| 'EndDate' | |
| ] | |
| with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for time_period in results['ResultsByTime']: | |
| start_date = time_period['TimePeriod']['Start'] | |
| end_date = time_period['TimePeriod']['End'] | |
| for group in time_period.get('Groups', []): | |
| # Extract account ID and service from the keys (only 2 GroupBy allowed) | |
| account_id = group['Keys'][0] if len(group['Keys']) > 0 else 'Unknown' | |
| service = group['Keys'][1] if len(group['Keys']) > 1 else 'Unknown' | |
| usage_type = 'N/A' # Not available with this GroupBy configuration | |
| # Get account alias | |
| account_alias = account_aliases.get(account_id, account_id) | |
| # Get cost information | |
| cost_amount = group['Metrics']['BlendedCost']['Amount'] | |
| cost_unit = group['Metrics']['BlendedCost']['Unit'] | |
| # Only write rows with non-zero costs (optional - remove this condition to include zero costs) | |
| if float(cost_amount) > 0: | |
| writer.writerow({ | |
| 'Date': start_date, | |
| 'AccountId': account_id, | |
| 'AccountAlias': account_alias, | |
| 'Service': service, | |
| 'UsageType': usage_type, | |
| 'Cost': cost_amount, | |
| 'Unit': cost_unit, | |
| 'StartDate': start_date, | |
| 'EndDate': end_date | |
| }) | |
| def print_summary(results, account_aliases=None): | |
| """Print a summary of the cost data""" | |
| if account_aliases is None: | |
| account_aliases = {} | |
| total_cost = 0 | |
| account_costs = {} | |
| account_service_costs = {} | |
| for time_period in results['ResultsByTime']: | |
| for group in time_period.get('Groups', []): | |
| cost = float(group['Metrics']['BlendedCost']['Amount']) | |
| total_cost += cost | |
| # Extract account ID and service (based on current GroupBy order) | |
| if group['Keys']: | |
| account_id = group['Keys'][0] | |
| service = group['Keys'][1] if len(group['Keys']) > 1 else 'Unknown' | |
| # Track costs by account | |
| account_costs[account_id] = account_costs.get(account_id, 0) + cost | |
| # Track costs by account and service | |
| if account_id not in account_service_costs: | |
| account_service_costs[account_id] = {} | |
| account_service_costs[account_id][service] = account_service_costs[account_id].get(service, 0) + cost | |
| print(f"\n--- Summary ---") | |
| print(f"Total cost: ${total_cost:.2f}") | |
| print(f"Number of time periods: {len(results['ResultsByTime'])}") | |
| if account_costs: | |
| print(f"\nCosts by Account:") | |
| sorted_accounts = sorted(account_costs.items(), key=lambda x: x[1], reverse=True) | |
| for account_id, account_cost in sorted_accounts: | |
| account_alias = account_aliases.get(account_id, account_id) | |
| account_display = f"{account_alias} ({account_id})" if account_alias != account_id else account_id | |
| print(f"\n {account_display}: ${account_cost:.2f}") | |
| # Show top services for this account | |
| if account_id in account_service_costs: | |
| sorted_services = sorted(account_service_costs[account_id].items(), key=lambda x: x[1], reverse=True) | |
| for service, service_cost in sorted_services[:5]: # Top 5 services per account | |
| print(f" └─ {service}: ${service_cost:.2f}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Fetch AWS Cost and Usage data with pagination') | |
| parser.add_argument( | |
| '--days', type=int, default=30, help='Number of days to look back (default: 30)') | |
| parser.add_argument( | |
| '--json-output', type=str, help='JSON output filename (default: aws-detailed-usage-{days}-{date}.json)') | |
| parser.add_argument( | |
| '--csv-output', type=str, help='CSV output filename (default: aws-detailed-usage-{days}-{date}.csv)') | |
| args = parser.parse_args() | |
| get_cost_and_usage_paginated( | |
| days=args.days, | |
| output_file=args.json_output, | |
| csv_output=args.csv_output | |
| ) | |
| if __name__ == '__main__': | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env bash | |
| set -euo pipefail | |
| # Default values | |
| DAYS=30 | |
| JSON_OUTPUT="" | |
| CSV_OUTPUT="" | |
| VERBOSE=false | |
| # Function to show usage | |
| usage() { | |
| cat << EOF | |
| Usage: $0 [OPTIONS] | |
| Fetch AWS Cost and Usage data using AWS CLI | |
| OPTIONS: | |
| --days NUMBER Number of days to look back (default: 30) | |
| --json-output FILE JSON output filename (default: auto-generated) | |
| --csv-output FILE CSV output filename (default: auto-generated) | |
| --verbose Enable verbose output | |
| --help Show this help message | |
| Examples: | |
| $0 # Basic usage (30 days) | |
| $0 --days 7 # Last week | |
| $0 --days 90 --csv-output costs.csv # Custom time and filename | |
| EOF | |
| } | |
| # Parse command line arguments | |
| while [[ $# -gt 0 ]]; do | |
| case $1 in | |
| --days) | |
| DAYS="$2" | |
| shift 2 | |
| ;; | |
| --json-output) | |
| JSON_OUTPUT="$2" | |
| shift 2 | |
| ;; | |
| --csv-output) | |
| CSV_OUTPUT="$2" | |
| shift 2 | |
| ;; | |
| --verbose) | |
| VERBOSE=true | |
| shift | |
| ;; | |
| --help) | |
| usage | |
| exit 0 | |
| ;; | |
| *) | |
| echo "Error: Unknown option $1" | |
| usage | |
| exit 1 | |
| ;; | |
| esac | |
| done | |
| # Function to log messages | |
| log() { | |
| if [[ "$VERBOSE" == "true" ]]; then | |
| echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" >&2 | |
| fi | |
| } | |
| # Function to get current account info | |
| get_current_account_info() { | |
| local account_id | |
| # Get account ID | |
| account_id=$(aws sts get-caller-identity --query 'Account' --output text 2>/dev/null || echo "unknown") | |
| echo "$account_id" | |
| } | |
| # Function to get account aliases from Organizations | |
| get_account_aliases() { | |
| local temp_file="$1" | |
| log "Fetching account information from Organizations..." | |
| if aws organizations list-accounts --output json > "$temp_file" 2>/dev/null; then | |
| local account_count | |
| account_count=$(jq '.Accounts | length' "$temp_file") | |
| log "Found $account_count accounts in organization" | |
| return 0 | |
| else | |
| log "Warning: Could not fetch account aliases from Organizations" | |
| echo '{"Accounts": []}' > "$temp_file" | |
| return 0 | |
| fi | |
| } | |
| # Function to convert JSON to CSV | |
| json_to_csv() { | |
| local json_file="$1" | |
| local csv_file="$2" | |
| local accounts_file="$3" | |
| log "Converting JSON to CSV..." | |
| # Create CSV header | |
| echo "Date,AccountId,AccountAlias,Service,UsageType,Cost,Unit,StartDate,EndDate" > "$csv_file" | |
| # Process each time period | |
| jq -r --slurpfile accounts "$accounts_file" ' | |
| .ResultsByTime[] as $period | | |
| $period.TimePeriod.Start as $start_date | | |
| $period.TimePeriod.End as $end_date | | |
| $period.Groups[] as $group | | |
| # Extract account ID and service from keys | |
| ($group.Keys[0] // "Unknown") as $account_id | | |
| ($group.Keys[1] // "Unknown") as $service | | |
| # Get account alias from organizations data (fallback to account_id if not found) | |
| (if ($accounts[0].Accounts | length) > 0 then | |
| ($accounts[0].Accounts[] | select(.Id == $account_id) | .Name // $account_id) | |
| else | |
| $account_id | |
| end) as $account_alias | | |
| # Get cost information | |
| $group.Metrics.BlendedCost.Amount as $cost_amount | | |
| $group.Metrics.BlendedCost.Unit as $cost_unit | | |
| # Only output non-zero costs | |
| if ($cost_amount | tonumber) > 0 then | |
| [ | |
| $start_date, | |
| $account_id, | |
| $account_alias, | |
| $service, | |
| "N/A", # UsageType not available with this GroupBy | |
| $cost_amount, | |
| $cost_unit, | |
| $start_date, | |
| $end_date | |
| ] | @csv | |
| else | |
| empty | |
| end | |
| ' "$json_file" >> "$csv_file" | |
| } | |
| # Function to print summary | |
| print_summary() { | |
| local json_file="$1" | |
| local accounts_file="$2" | |
| echo | |
| echo "--- Summary ---" | |
| # Calculate total cost | |
| local total_cost | |
| total_cost=$(jq -r '[.ResultsByTime[].Groups[].Metrics.BlendedCost.Amount | tonumber] | add' "$json_file") | |
| if [[ "$total_cost" == "null" ]] || [[ -z "$total_cost" ]]; then | |
| total_cost="0" | |
| fi | |
| printf "Total cost: \$%.2f\n" "$total_cost" | |
| # Get number of time periods | |
| local time_periods | |
| time_periods=$(jq '.ResultsByTime | length' "$json_file") | |
| echo "Number of time periods: ${time_periods}" | |
| echo | |
| echo "Costs by Account:" | |
| # Use a simpler approach - process the data directly without temp files | |
| local account_summary | |
| account_summary=$(jq -r --slurpfile accounts "$accounts_file" ' | |
| # Group costs by account | |
| reduce .ResultsByTime[].Groups[] as $group ({}; | |
| . + {($group.Keys[0]): ((.[$group.Keys[0]] // 0) + ($group.Metrics.BlendedCost.Amount | tonumber))}) | | |
| # Sort by value and format output | |
| to_entries | sort_by(.value) | reverse[] | | |
| .key as $account_id | | |
| .value as $account_cost | | |
| (if ($accounts[0].Accounts | length) > 0 then | |
| ($accounts[0].Accounts[] | select(.Id == $account_id) | .Name // $account_id) | |
| else | |
| $account_id | |
| end) as $account_alias | | |
| # Format account line | |
| ["", (if $account_alias != $account_id then " " + $account_alias + " (" + $account_id + ")" else " " + $account_id end) + ": $" + (($account_cost * 100 | round) / 100 | tostring)] | .[] | |
| ' "$json_file") | |
| echo "$account_summary" | |
| # Now get service breakdown for each account | |
| jq -r --slurpfile accounts "$accounts_file" ' | |
| # Group costs by account and service | |
| reduce .ResultsByTime[].Groups[] as $group ({}; | |
| .[$group.Keys[0]][$group.Keys[1]] = ((.[$group.Keys[0]][$group.Keys[1]] // 0) + ($group.Metrics.BlendedCost.Amount | tonumber))) | | |
| # Process each account | |
| to_entries[] | | |
| .key as $account_id | | |
| .value | to_entries | sort_by(.value) | reverse | .[0:5][] | | |
| " └─ " + .key + ": $" + ((.value * 100 | round) / 100 | tostring) | |
| ' "$json_file" | |
| } | |
| # Main execution | |
| main() { | |
| # Calculate date range | |
| local end_date start_date | |
| if command -v gdate > /dev/null 2>&1; then | |
| # Use GNU date (available via brew install coreutils on macOS) | |
| end_date=$(gdate +%Y-%m-%d) | |
| start_date=$(gdate -d "-${DAYS} days" +%Y-%m-%d) | |
| elif date -v1d > /dev/null 2>&1; then | |
| # macOS/BSD date | |
| end_date=$(date +%Y-%m-%d) | |
| start_date=$(date -v-${DAYS}d +%Y-%m-%d) | |
| else | |
| # Linux/GNU date | |
| end_date=$(date +%Y-%m-%d) | |
| start_date=$(date -d "-${DAYS} days" +%Y-%m-%d) | |
| fi | |
| echo "Fetching cost data from $start_date to $end_date" | |
| # Get current account info for filenames | |
| local current_account_info | |
| current_account_info=$(get_current_account_info) | |
| # Set default output filenames if not provided | |
| if [[ -z "$JSON_OUTPUT" ]]; then | |
| JSON_OUTPUT="aws-detailed-usage-${DAYS}-${end_date}-${current_account_info}.json" | |
| fi | |
| if [[ -z "$CSV_OUTPUT" ]]; then | |
| CSV_OUTPUT="aws-detailed-usage-${DAYS}-${end_date}-${current_account_info}.csv" | |
| fi | |
| # Create temporary files | |
| local temp_dir | |
| temp_dir=$(mktemp -d) | |
| local accounts_file="${temp_dir}/accounts.json" | |
| local raw_output="${temp_dir}/raw_output.json" | |
| # Cleanup function | |
| cleanup() { | |
| if [[ -n "${temp_dir:-}" ]] && [[ -d "$temp_dir" ]]; then | |
| rm -rf "$temp_dir" | |
| fi | |
| } | |
| trap cleanup EXIT | |
| # Get account information | |
| get_account_aliases "$accounts_file" | |
| log "Making Cost Explorer API call..." | |
| # Make the AWS Cost Explorer API call | |
| if ! aws ce get-cost-and-usage \ | |
| --time-period Start="$start_date",End="$end_date" \ | |
| --granularity DAILY \ | |
| --metrics "BlendedCost" \ | |
| --group-by Type=DIMENSION,Key=LINKED_ACCOUNT Type=DIMENSION,Key=SERVICE \ | |
| --output json > "$raw_output"; then | |
| echo "Error: Failed to fetch cost data from AWS Cost Explorer" >&2 | |
| exit 1 | |
| fi | |
| # Copy raw output to final JSON file | |
| cp "$raw_output" "$JSON_OUTPUT" | |
| echo "JSON results written to $JSON_OUTPUT" | |
| # Convert to CSV | |
| json_to_csv "$raw_output" "$CSV_OUTPUT" "$accounts_file" | |
| echo "CSV results written to $CSV_OUTPUT" | |
| # Print summary | |
| print_summary "$raw_output" "$accounts_file" | |
| log "Script completed successfully" | |
| } | |
| # Check dependencies | |
| check_dependencies() { | |
| local missing_deps=() | |
| if ! command -v aws > /dev/null 2>&1; then | |
| missing_deps+=("aws-cli") | |
| fi | |
| if ! command -v jq > /dev/null 2>&1; then | |
| missing_deps+=("jq") | |
| fi | |
| if [[ ${#missing_deps[@]} -gt 0 ]]; then | |
| echo "Error: Missing required dependencies: ${missing_deps[*]}" >&2 | |
| echo "Please install them and try again." >&2 | |
| exit 1 | |
| fi | |
| } | |
| # Run dependency check and main function | |
| check_dependencies | |
| main "$@" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment