Created
June 30, 2016 00:15
-
-
Save SegFaultAX/b20f4706b9075112af90baabc2add14b to your computer and use it in GitHub Desktop.
AWS Billing Data parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import argparse | |
import operator | |
# {'AvailabilityZone': '', | |
# 'BlendedCost': '0.00', | |
# 'BlendedRate': '0.00', | |
# 'InvoiceID': 'xxx', | |
# 'ItemDescription': '$0.05 per GB-month of Magnetic provisioned storage - US East (Northern Virginia)', | |
# 'LinkedAccountId': 'xxx', | |
# 'Operation': 'CreateVolume', | |
# 'PayerAccountId': 'xx', | |
# 'PricingPlanId': 'xx', | |
# 'ProductName': 'Amazon Elastic Compute Cloud', | |
# 'RateId': 'xx', | |
# 'RecordId': 'xx', | |
# 'RecordType': 'LineItem', | |
# 'ReservedInstance': 'N', | |
# 'ResourceId': 'vol-xx', | |
# 'SubscriptionId': 'xx', | |
# 'UnBlendedCost': '0.00', | |
# 'UnBlendedRate': '0.00', | |
# 'UsageEndDate': '2016-05-01 01:00:00', | |
# 'UsageQuantity': '0.00', | |
# 'UsageStartDate': '2016-05-01 00:00:00', | |
# 'UsageType': 'EBS:VolumeUsage', | |
# 'user:ENV': '', | |
# 'user:Environment': '', | |
# 'user:Group': '', | |
# 'user:Name': '', | |
# 'user:Team': '', | |
# 'user:component_id': '', | |
# 'user:env': '', | |
# 'user:name': ''} | |
def parse_args(args=None, parse=True): | |
parser = argparse.ArgumentParser( | |
description="Check billing information") | |
parser.add_argument("file", | |
help="Path to billing report") | |
res = parser.parse_args(args) if parse else None | |
return parser, res | |
def uniq_by(fn, l): | |
seen, res = set(), [] | |
for e in l: | |
key = fn(e) | |
if key not in seen: | |
seen.add(key) | |
res.append(e) | |
return res | |
def transform(xform, d): | |
for k, v in xform.items(): | |
if k in d: | |
d[k] = v(d[k]) | |
return d | |
def pluck(d, *keys): | |
return { k: d[k] for k in keys if k in d } | |
def aggregate(records, *keys): | |
res = { k: 0.0 for k in keys } | |
for rec in records: | |
for k in keys: | |
res[k] += rec.get(k, 0.0) | |
return res | |
def main(): | |
from pprint import pprint as pp | |
_parser, args = parse_args() | |
with open(args.file) as f: | |
reader = csv.DictReader(f) | |
data = list(reader) | |
lineitems = [e for e in data if e["RecordType"] == "LineItem"] | |
uniq_lineitems = uniq_by(operator.itemgetter("RecordId"), lineitems) | |
xform = {"BlendedCost": float, "UnBlendedCost": float} | |
billing = [transform(xform, pluck(e, *xform.keys())) for e in lineitems] | |
costs = aggregate(billing, *xform.keys()) | |
uniq_billing = [transform(xform, pluck(e, *xform.keys())) for e in uniq_lineitems] | |
uniq_costs = aggregate(uniq_billing, *xform.keys()) | |
pp(costs) | |
pp(uniq_costs) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment