Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save laughingman7743/13fe56d338dcf08a5056e0ff08838972 to your computer and use it in GitHub Desktop.
Save laughingman7743/13fe56d338dcf08a5056e0ff08838972 to your computer and use it in GitHub Desktop.
CREATE SCHEMA IF NOT EXISTS billing;
CREATE EXTERNAL TABLE IF NOT EXISTS billing.detailed_line_items_with_resources_and_tags (
invoice_id string,
payer_account_id string,
linked_account_id string,
record_type string,
record_id string,
product_name string,
rate_id string,
subscription_id string,
pricing_plan_id string,
usage_type string,
operation string,
availability_zone string,
reserved_instance string,
item_description string,
usage_start_date string, -- TODO timestamp
usage_end_date string, -- TODO timestamp
usage_quantity double,
rate double,
cost double,
-- blended_rate double,
-- blended_cost double,
-- unblended_rate double,
-- unblended_cost double,
resource_id string
)
partitioned by(year int, month int)
STORED AS PARQUET
LOCATION 's3://YOUR_ATHENA_BUCKET/billing/detailed_line_items_with_resources_and_tags/'
TBLPROPERTIES ("parquet.compress"="SNAPPY");
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
brew install snappy or apt-get install libsnappy-dev or yum install snappy-devel
pip install pyarrow pandas
"""
import os
import zipfile
from collections import OrderedDict
from datetime import datetime
import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
AWS_ACCOUNT_ID = 'YOUR_AWS_ACCOUNT_ID'
BILLING_BUCKET = 'YOUR_BILLING_BUCKET'
BILLING_FILE = '{account_id}-aws-billing-detailed-line-items-' \
'with-resources-and-tags-{target_month}.csv.zip'
ATHENA_BUCKET = 'YOUR_ATHENA_BUCKET'
ATHENA_BILLING_TABLE_LOCATION = 'billing/detailed_line_items_' \
'with_resources_and_tags/year={year}/month={month}/{file_name}'
TMP_DIR = '/tmp/'
COLUMN_NAMES_AND_DTYPES = OrderedDict([
('invoice_id', str),
('payer_account_id', str),
('linked_account_id', str),
('record_type', str),
('record_id', str),
('product_name', str),
('rate_id', str),
('subscription_id', str),
('pricing_plan_id', str),
('usage_type', str),
('operation', str),
('availability_zone', str),
('reserved_instance', str),
('item_description', str),
('usage_start_date', datetime),
('usage_end_date', datetime),
('usage_quantity', float),
('rate', float),
('cost', float),
# ('blended_rate', float),
# ('blended_cost', float),
# ('unblended_rate', float),
# ('unblended_cost', float),
('resource_id', str),
])
def main():
target_month = datetime(2017, 5, 1)
billing_file = BILLING_FILE.format(account_id=AWS_ACCOUNT_ID,
target_month=target_month.strftime('%Y-%m'))
# download
zip_file = os.path.join(TMP_DIR, billing_file)
s3 = boto3.client('s3')
s3.download_file(BILLING_BUCKET, billing_file, zip_file)
# transform
base_file_name = billing_file.split('.')[0]
snappy_parquet_file = os.path.join(TMP_DIR, base_file_name + '.parquet.snappy')
if os.path.exists(snappy_parquet_file):
os.remove(snappy_parquet_file)
with zipfile.ZipFile(zip_file, 'r') as z:
df = pd.read_csv(z.open(base_file_name + '.csv', 'r'), dtype='object')
df.columns = COLUMN_NAMES_AND_DTYPES.keys()
for k, v in COLUMN_NAMES_AND_DTYPES.items():
df[k] = df[k].astype(v)
table = pa.Table.from_pandas(df, timestamps_to_ms=True)
pq.write_table(table, snappy_parquet_file,
use_dictionary=False,
compression='SNAPPY')
# upload
key = ATHENA_BILLING_TABLE_LOCATION.format(
year=target_month.year,
month=target_month.month,
file_name=base_file_name + '.parquet.snappy')
s3.upload_file(snappy_parquet_file, ATHENA_BUCKET, key)
os.remove(snappy_parquet_file)
os.remove(zip_file)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment