Skip to content

Instantly share code, notes, and snippets.

@shollingsworth
Last active April 29, 2022 22:42
Show Gist options
  • Save shollingsworth/a4cf244a8f16ca2ed98652ac56e9666f to your computer and use it in GitHub Desktop.
Save shollingsworth/a4cf244a8f16ca2ed98652ac56e9666f to your computer and use it in GitHub Desktop.
Gives a very ROUGH estimate for AWS costs of unattached volumes since creation.This is only an estimate, and meant to shine a light on possible savings.In my paricular case, I used AWS sso to generate a list of profiles for allaccounts in our organization and iterated over those accounts to findunattached EBS volumes.You can modify the script to…
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Gives a very ROUGH estimate for costs of unattached volumes since creation.
This is only an estimate, and meant to shine a light on possible savings.
In my paricular case, I used AWS sso to generate a list of profiles for all
accounts in our organization and iterated over those accounts to find
unattached EBS volumes.
You can modify the script to fit your needs.
The following environment variables will need to be set:
ORG_PROFILE: The profile to use for the organization to list accounts
SSO_URL: The url to use for SSO
SSO_REGION: The region to use for SSO
SSO_PERMISSION_SET: SSO permission set to use that is linked to all accounts
REGIONS: comma separated list of regions to use
requirements.txt
boto3==1.22.4; python_version >= "3.6"
botocore==1.25.4; python_version >= "3.6"
currencyconverter==0.16.11
jmespath==1.0.0; python_version >= "3.7"
numpy==1.22.3
pandas==1.4.2; python_version >= "3.8"
python-dateutil==2.8.2; python_version >= "3.8" and python_full_version < "3.0.0" or python_full_version >= "3.3.0" and python_version >= "3.8"
pytz==2022.1; python_version >= "3.8"
s3transfer==0.5.2; python_version >= "3.6"
six==1.16.0; python_version >= "3.8" and python_full_version < "3.0.0" or python_full_version >= "3.3.0" and python_version >= "3.8"
urllib3==1.26.9; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" and python_version < "4" and python_version >= "3.6"
"""
from configparser import ConfigParser
from dataclasses import dataclass
import datetime
import json
import os
from pathlib import Path
import re
from typing import Iterable, List, Optional, Tuple
import boto3
from currency_converter import CurrencyConverter
import pandas as pd
# pyright: reportGeneralTypeIssues=false
ORG_PROFILE = os.environ.get("ORG_PROFILE", "default")
SSO_URL = os.environ.get("SSO_URL", "")
SSO_REGION = os.environ.get("SSO_REGION", "")
SSO_PERMISSION_SET = os.environ.get("SSO_PERMISSION_SET", "")
REGIONS = os.environ.get("REGION", "").split(",")
missing = [
i
for i in ["SSO_URL", "SSO_REGION", "SSO_PERMISSION_SET", "REGIONS"]
if i not in os.environ
]
if missing:
print(f"Missing environment variables: {missing}")
exit(1)
PROFILE_CACHE = {}
AWS_CONFIG_FILE = Path(__file__).parent / "aws_profiles"
CACHE_DIR = Path(__file__).parent / "cache"
CACHE_DIR.mkdir(exist_ok=True)
CONFIG = ConfigParser()
@dataclass(init=False)
class Vol:
def __init__(self, **kwargs) -> None:
"""DataClass DataClass."""
self.__dict__.update(kwargs)
Attachments: Optional[List] = list
AvailabilityZone: Optional[str] = ""
CreateTime: Optional[float] = None
Encrypted: Optional[int] = None
Size: Optional[int] = None
SnapshotId: Optional[str] = ""
State: Optional[str] = ""
VolumeId: Optional[str] = ""
Iops: Optional[int] = None
VolumeType: Optional[str] = ""
MultiAttachEnabled: Optional[int] = None
profile: Optional[str] = ""
price: Optional[str] = ""
region: Optional[str] = ""
@property
def dt(self) -> datetime.datetime:
"""Return datetime."""
return datetime.datetime.fromtimestamp(self.CreateTime)
def costs(self) -> Iterable[Tuple[datetime.datetime, datetime.datetime, float]]:
"""Return total cost."""
def _get_nm(dt: datetime.datetime) -> datetime.datetime:
"""Return next month."""
return (dt.replace(day=1) + datetime.timedelta(days=32)).replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
dt = self.dt
# get current time to the first second of the next month
while dt < datetime.datetime.now():
start_month = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
eom = _get_nm(dt)
total = (eom - start_month).total_seconds()
sec = (eom - dt).total_seconds()
frac = float(sec / total)
yield dt, eom, (float(self.Size) * float(self.price)) * frac
dt = eom
@property
def total_cost(self) -> float:
"""Return total cost."""
return sum(cost for _, _, cost in self.costs())
def profile_session(profile: str):
"""Return session for profile."""
if profile in PROFILE_CACHE:
return PROFILE_CACHE[profile]
session = boto3.Session(profile_name=profile)
PROFILE_CACHE[profile] = session
return session
def normalize_name(acct_name: str):
"""Normalize account name."""
arr = re.split(r"[^a-zA-Z0-9]", acct_name)
val = "-".join(i for i in arr if i.strip()).lower()
return val
def gen_profile_file(billing_sess: boto3.Session):
cli = billing_sess.client("organizations")
pag = cli.get_paginator("list_accounts")
for page in pag.paginate():
for account in page["Accounts"]:
if account["Status"] != "ACTIVE":
continue
name = normalize_name(account["Name"])
_id = account["Id"]
profile_name = f"{name}-{_id}"
sect_name = f"profile {profile_name}"
ins_val = {
"sso_start_url": SSO_URL,
"sso_region": SSO_REGION,
"sso_account_id": str(_id),
"sso_role_name": SSO_PERMISSION_SET,
}
# AWS_CONFIG_FILE
print(f"Creating profile {profile_name}")
CONFIG.add_section(sect_name)
for key, val in ins_val.items():
CONFIG.set(sect_name, key, val)
with open(AWS_CONFIG_FILE, "w") as f:
CONFIG.write(f)
def get_profiles(config: ConfigParser):
"""Get all profiles."""
profiles = []
for section in config.sections():
if section.startswith("profile"):
profiles.append(section.replace("profile ", ""))
return profiles
def iter_ebs_price_map(profile: str):
session = profile_session(profile)
c_conv = CurrencyConverter()
pricing = session.client("pricing", region_name="us-east-1")
pag = pricing.get_paginator("get_products")
for i in pag.paginate(
ServiceCode="AmazonEC2",
Filters=[
{"Type": "TERM_MATCH", "Field": "productFamily", "Value": "Storage"},
],
):
for i in i["PriceList"]:
dat = json.loads(i)
volume_name = dat["product"]["attributes"]["volumeApiName"]
region = dat["product"]["attributes"]["regionCode"]
if region not in REGIONS:
continue
for od in dat["terms"]["OnDemand"].values():
for dim in od["priceDimensions"].values():
_unit = list(dim["pricePerUnit"].keys())[0]
uval = dim["pricePerUnit"][_unit]
if _unit == "USD":
yield (volume_name, region, uval)
else:
yield volume_name, region, c_conv.convert(
uval,
_unit,
"USD",
)
def ebs_price_map():
"""Map EBS volume types to pricing."""
cfile = CACHE_DIR.joinpath("ebs_price_map.json")
if cfile.exists():
with open(cfile, "r") as f:
price_map = json.load(f)
else:
price_map = {}
for region, vol_type, price in iter_ebs_price_map(ORG_PROFILE):
print("price map: {} {} {}".format(region, vol_type, price))
price_map.setdefault(vol_type, {})[region] = price
with open(cfile, "w") as f:
json.dump(price_map, f)
return price_map
def iter_vols(profile: str):
"""Iterate over volumes."""
retval = []
def get_unmounted_volumes(session: boto3.Session, region: str):
"""Do thing."""
ec2 = session.client("ec2", region_name=region)
pag = ec2.get_paginator("describe_volumes")
for page in pag.paginate():
for vol in page["Volumes"]:
if vol["State"] == "available": # type: ignore
if not vol["Attachments"]: # type: ignore
yield vol
for region in REGIONS:
sess = profile_session(profile)
for vol in get_unmounted_volumes(sess, region):
vol["CreateTime"] = vol["CreateTime"].timestamp() # type: ignore
print(f'{profile} {region} {vol["VolumeId"]} {vol["CreateTime"]}') # type: ignore
retval.append(vol)
return retval
def main():
"""Run main function."""
billing_sess = profile_session(ORG_PROFILE)
p_map = ebs_price_map()
os.environ.update({"AWS_CONFIG_FILE": str(AWS_CONFIG_FILE)})
if not AWS_CONFIG_FILE.exists():
gen_profile_file(billing_sess)
CONFIG.read(AWS_CONFIG_FILE)
profiles = get_profiles(CONFIG)
for profile in profiles:
print(f"Processing profile {profile}")
vcache_file = CACHE_DIR.joinpath(f"{profile}_volumes.json")
if vcache_file.exists():
print("Skipping existing cache file")
continue
vals = iter_vols(profile)
with open(vcache_file, "w") as f:
json.dump(vals, f)
all_data = []
for profile in profiles:
vcache_file = CACHE_DIR.joinpath(f"{profile}_volumes.json")
with open(vcache_file, "r") as f:
vols = json.load(f)
for vol in vols:
vt = vol["VolumeType"]
az = vol["AvailabilityZone"]
region = az[:-1]
price = p_map[region][vt]
vol["profile"] = profile
vol["price"] = price
vol["region"] = region
all_data.append(vol)
rows = []
for vol in all_data:
dat = Vol(**vol)
rows.append(
{
"profile": dat.profile,
"region": dat.region,
"volume_id": dat.VolumeId,
"volume_type": dat.VolumeType,
"size": dat.Size,
"create_time": dat.dt.isoformat(),
"total_cost": dat.total_cost,
}
)
df = pd.DataFrame(rows)
dest_file = "unattached_ebs_volume_cost_estimate.csv"
df.to_csv(
dest_file,
index=False,
columns=[
"profile",
"region",
"volume_id",
"volume_type",
"size",
"create_time",
"total_cost",
],
)
print(f"Wrote {len(rows)} rows to {dest_file}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment