-
-
Save rwiggins/b945b5ec1693c1d473a814915adcee8c to your computer and use it in GitHub Desktop.
import boto3 | |
import botocore | |
import datetime | |
import progressbar | |
import itertools | |
# from https://docs.aws.amazon.com/AmazonS3/latest/dev/cloudwatch-monitoring.html | |
STORAGE_TYPES = [ | |
"StandardStorage", | |
"IntelligentTieringStorage", | |
"StandardIAStorage", | |
"StandardIASizeOverhead", | |
"StandardIAObjectOverhead", | |
"OneZoneIAStorage", | |
"OneZoneIASizeOverhead", | |
"ReducedRedundancyStorage", | |
"GlacierStorage", | |
"GlacierStagingStorage", | |
"GlacierObjectOverhead", | |
"GlacierS3ObjectOverhead", | |
"DeepArchiveStorage", | |
"DeepArchiveObjectOverhead", | |
"DeepArchiveS3ObjectOverhead", | |
"DeepArchiveStagingStorage" | |
] | |
now = datetime.datetime.now() | |
cw = boto3.client('cloudwatch') | |
s3client = boto3.client('s3') | |
# Get a list of all buckets | |
allbuckets = s3client.list_buckets() | |
# Grab all the bucket tags for pretty display | |
print('Getting bucket tags...') | |
tag_bar = progressbar.ProgressBar() | |
bucket_tags = {} | |
for bucket in tag_bar(allbuckets['Buckets']): | |
try: | |
bucket_tagging = s3client.get_bucket_tagging(Bucket=bucket['Name']) | |
bucket_tags[bucket['Name']] = bucket_tagging['TagSet'] | |
except botocore.exceptions.ClientError: | |
bucket_tags[bucket['Name']] = [] | |
# Look up the storage for each storage type from CloudWatch | |
print('Calculating storage usage...') | |
bucket_info = [] | |
start_time = now - datetime.timedelta(days=2) | |
end_time = now - datetime.timedelta(days=1) | |
main_bar = progressbar.ProgressBar() | |
bucket_storages = list(itertools.product(allbuckets['Buckets'], STORAGE_TYPES)) | |
for bucket, storage_type in main_bar(bucket_storages): | |
metrics = cw.get_metric_statistics( | |
Namespace='AWS/S3', | |
MetricName='BucketSizeBytes', | |
Dimensions=[ | |
{'Name': 'BucketName', 'Value': bucket['Name']}, | |
{'Name': 'StorageType', 'Value': storage_type} | |
], | |
Statistics=['Average'], | |
Period=3600, | |
StartTime=start_time.isoformat(), | |
EndTime=end_time.isoformat() | |
) | |
# The cloudwatch metrics will have the single datapoint, so we just report on it. | |
for item in metrics["Datapoints"]: | |
size_bytes = int(item['Average']) | |
size_gb = size_bytes/1024/1024/1024 | |
bucket_info.append({ | |
'name': '{} ({})'.format(bucket['Name'], storage_type), | |
'size': size_gb, | |
'tags': bucket_tags[bucket['Name']] | |
}) | |
# Pretty looking tags | |
def format_tags(tags): | |
return ', '.join([ | |
'{}={}'.format(tag['Key'], tag['Value']) | |
for tag in tags | |
]) | |
# Header Line for the output going to standard out | |
format_string = '{: <75} {: >15} {: <100}' | |
print(format_string.format('Bucket', 'Size (GiB)', 'Tags')) | |
for bucket in sorted(bucket_info, key=lambda x: x['size'], reverse=True): | |
print(format_string.format( | |
bucket['name'], | |
'{:.2f}'.format(bucket['size']), | |
format_tags(bucket['tags']))) |
Hi, i want to get a email on weekly basis, which contains the list of all s3 buckets and followed by their total bucket size in tabular form. Could you please help me in achieving this.
I think the list at the top needs to be updated.
(from https://docs.aws.amazon.com/AmazonS3/latest/userguide/metrics-dimensions.html)
Valid storage-type filters: StandardStorage, IntelligentTieringFAStorage, IntelligentTieringIAStorage, IntelligentTieringAAStorage, IntelligentTieringAIAStorage, IntelligentTieringDAAStorage, StandardIAStorage, StandardIASizeOverhead, StandardIAObjectOverhead, OneZoneIAStorage, OneZoneIASizeOverhead, ReducedRedundancyStorage, GlacierInstantRetrievalSizeOverhead GlacierInstantRetrievalStorage, GlacierStorage, GlacierStagingStorage, GlacierObjectOverhead, GlacierS3ObjectOverhead, DeepArchiveStorage, DeepArchiveObjectOverhead, DeepArchiveS3ObjectOverhead, and DeepArchiveStagingStorage (see the StorageType dimension)
Can you please help me with the expected output.