Skip to content

Instantly share code, notes, and snippets.

@xlbruce
Last active October 26, 2018 19:39
Show Gist options
  • Save xlbruce/920f698f91d2d8f735be8adfbd8cf432 to your computer and use it in GitHub Desktop.
Save xlbruce/920f698f91d2d8f735be8adfbd8cf432 to your computer and use it in GitHub Desktop.
Save EC2, RDS and ElastiCache prices into csv files.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import boto3
import csv
import json
import locale
import os
locale.setlocale(locale.LC_ALL,'')
def filter_by(field, value):
return {
"Field":field,
"Value":value,
"Type":"TERM_MATCH"
}
def is_valid(product_term, purchase_option):
term_attr = product_term['termAttributes']
return term_attr['LeaseContractLength'] in '1yr' \
and term_attr['PurchaseOption'] in purchase_option \
and term_attr['OfferingClass'] in ['convertible','standard']
def monthly_value(f):
def wrapper(*args):
result = float(f(*args))
return round(result * 730, ndigits=2)
return wrapper
def currency_format(f):
def wrapper(*args):
result = float(f(*args))
return locale.currency(result)
return wrapper
def offering_class(term):
if term['termAttributes']['OfferingClass'] == 'convertible':
return 0
return 1
@monthly_value
def on_demand(terms):
on_demand_term = terms['OnDemand']
key = next(iter(on_demand_term))
price_dimensions = on_demand_term[key]['priceDimensions']
inner_key = next(iter(price_dimensions))
return price_dimensions[inner_key]['pricePerUnit']['USD']
@monthly_value
def no_upfront(terms):
reserved_terms = sorted_reserved_terms(terms, 'No Upfront')
if not reserved_terms:
return 0
price_dimensions = reserved_terms[0]['priceDimensions']
unit_value = next(filter(lambda x: x['description'] != 'Upfront Fee', price_dimensions.values()))
return unit_value['pricePerUnit']['USD']
def sorted_reserved_terms(terms, offering_class_name):
try:
reserved_terms = [x for x in terms['Reserved'].values() if is_valid(x, offering_class_name)]
except KeyError:
return None
return sorted(reserved_terms, key=offering_class)
def heavy_utilization_monthly(terms):
return partial_monthly(terms, 'Heavy Utilization')
def heavy_utilization(terms):
return partial_upfront(terms, 'Heavy Utilization')
@monthly_value
def partial_monthly(terms, purchase_option_name='Partial Upfront'):
reserved_terms = sorted_reserved_terms(terms, purchase_option_name)
if not reserved_terms:
return 0
price_dimensions = reserved_terms[0]['priceDimensions']
unit_value = next(filter(lambda x: x['description'] != 'Upfront Fee', price_dimensions.values()))
return unit_value['pricePerUnit']['USD']
def partial_upfront(terms, purchase_option_name='Partial Upfront'):
reserved_terms = sorted_reserved_terms(terms, purchase_option_name)
if not reserved_terms:
return 0
price_dimensions = reserved_terms[0]['priceDimensions']
unit_value = next(filter(lambda x: x['description'] == 'Upfront Fee', price_dimensions.values()))
return unit_value['pricePerUnit']['USD']
def all_upfront(terms):
reserved_terms = sorted_reserved_terms(terms, 'All Upfront')
if not reserved_terms:
return 0
price_dimensions = reserved_terms[0]['priceDimensions']
unit_value = next(filter(lambda x: x['description'] == 'Upfront Fee', price_dimensions.values()))
return unit_value['pricePerUnit']['USD']
def price_extractor(purchase_option):
extractors = {
'On Demand': on_demand,
'No Upfront': no_upfront,
'Partial Monthly': partial_monthly,
'Partial Upfront': partial_upfront,
'All Upfront': all_upfront,
'Heavy Utilization': heavy_utilization,
'Heavy Utilization Monthly': heavy_utilization_monthly
}
return extractors.get(purchase_option, None)
@currency_format
def term_price(product_terms, purchase_option):
extractor = price_extractor(purchase_option)
return extractor(product_terms)
def get_ec2_price(product):
obj = json.loads(product)
obj_attr = obj['product']['attributes']
obj_terms = obj['terms']
return {
'Region':obj_attr['location'],
'Type':obj_attr['instanceType'],
'Platform':obj_attr['operatingSystem'],
'On Demand':term_price(obj_terms, 'On Demand'),
'No Upfront':term_price(obj_terms, 'No Upfront'),
'Partial Upfront':term_price(obj_terms, 'Partial Upfront'),
'Partial Monthly':term_price(obj_terms, 'Partial Monthly'),
'All Upfront':term_price(obj_terms, 'All Upfront')
}
def get_rds_price(product):
obj = json.loads(product)
obj_attr = obj['product']['attributes']
obj_terms = obj['terms']
return {
'Region':obj_attr['location'],
'Type':obj_attr['instanceType'],
'Deployment Type':obj_attr['deploymentOption'],
'Engine':obj_attr['databaseEngine'],
'License':obj_attr['licenseModel'],
'On Demand':term_price(obj_terms, 'On Demand'),
'No Upfront':term_price(obj_terms, 'No Upfront'),
'Partial Upfront':term_price(obj_terms, 'Partial Upfront'),
'Partial Monthly':term_price(obj_terms, 'Partial Monthly'),
'All Upfront':term_price(obj_terms, 'All Upfront')
}
def get_elasticache_price(product):
obj = json.loads(product)
obj_attr = obj['product']['attributes']
obj_terms = obj['terms']
return {
'Region':obj_attr['location'],
'Type':obj_attr['instanceType'],
'Engine':obj_attr['cacheEngine'],
'On Demand':term_price(obj_terms, 'On Demand'),
'Partial Upfront':term_price(obj_terms, 'Heavy Utilization'),
'Partial Monthly':term_price(obj_terms, 'Heavy Utilization Monthly'),
}
def get_service(service_name):
services = {
'AmazonEC2': get_ec2_price,
'AmazonRDS': get_rds_price,
'AmazonElastiCache': get_elasticache_price
}
return services.get(service_name, None)
def get_prices(service, filters):
data = client.get_products(ServiceCode=service, Filters=filters)
get_service_price = get_service(service)
prices = [get_service_price(item) for item in data['PriceList']]
return prices
def build_filters(filters=[]):
return [filter_by(k,v) for k,v in filters]
def create_csv(filename, headers):
with open(filename, 'w') as f:
writer = csv.DictWriter(f, headers)
writer.writeheader()
f.flush()
def write_csv(filename, prices):
if not prices:
return
fields = prices[0].keys()
if not os.path.exists(filename):
create_csv(filename, fields)
with open(filename, 'a') as f:
writer = csv.DictWriter(f, fieldnames=list(fields))
writer.writerows(prices)
if __name__ == '__main__':
client = boto3.client('pricing')
regions = ['South America (Sao Paulo)', 'US East (N. Virginia)']
ec2_os = ['Linux', 'Windows']
rds_engines = [('MySQL','No License Required'), \
('Oracle', 'License included'), \
('Oracle', 'Bring your own license')]
elasticache_engines = ['Redis', 'Memcached']
for region in regions:
for operating_system in ec2_os:
ec2_filters = [('tenancy', 'shared'), \
('preInstalledSw', 'NA'), \
('location', f'{region}'), \
('operatingSystem', f'{operating_system}')]
print(f"Getting prices for {operating_system} OS in {region}")
ec2_prices = get_prices('AmazonEC2', build_filters(ec2_filters))
write_csv('ec2.csv', ec2_prices)
for engine in rds_engines:
rds_filters = [('location', region), \
('databaseEngine', engine[0]), \
('licenseModel', engine[1])]
print(f"Getting RDS prices for {engine} engine in {region}")
rds_prices = get_prices('AmazonRDS', build_filters(rds_filters))
write_csv('rds.csv', rds_prices)
for engine in elasticache_engines:
elasticache_filters = [('cacheEngine', f"{engine}"), \
('location', f"{region}")]
print(f"Getting ElastiCache prices for {engine} engine in {region}")
elasticache_prices = get_prices('AmazonElastiCache', build_filters(elasticache_filters))
write_csv('elasticache.csv', elasticache_prices)
print("All done!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment