Skip to content

Instantly share code, notes, and snippets.

@rwiggins
Last active July 7, 2023 01:53
Show Gist options
  • Save rwiggins/b945b5ec1693c1d473a814915adcee8c to your computer and use it in GitHub Desktop.
Save rwiggins/b945b5ec1693c1d473a814915adcee8c to your computer and use it in GitHub Desktop.
Lists the storage size (for all storage types) and tags of all S3 buckets in an account using CloudWatch's GetMetricStatistics. Uses the default AWS credentials in your environment. (See awscli configuration for more information.)
import boto3
import botocore
import datetime
import progressbar
import itertools
# from https://docs.aws.amazon.com/AmazonS3/latest/dev/cloudwatch-monitoring.html
STORAGE_TYPES = [
"StandardStorage",
"IntelligentTieringStorage",
"StandardIAStorage",
"StandardIASizeOverhead",
"StandardIAObjectOverhead",
"OneZoneIAStorage",
"OneZoneIASizeOverhead",
"ReducedRedundancyStorage",
"GlacierStorage",
"GlacierStagingStorage",
"GlacierObjectOverhead",
"GlacierS3ObjectOverhead",
"DeepArchiveStorage",
"DeepArchiveObjectOverhead",
"DeepArchiveS3ObjectOverhead",
"DeepArchiveStagingStorage"
]
now = datetime.datetime.now()
cw = boto3.client('cloudwatch')
s3client = boto3.client('s3')
# Get a list of all buckets
allbuckets = s3client.list_buckets()
# Grab all the bucket tags for pretty display
print('Getting bucket tags...')
tag_bar = progressbar.ProgressBar()
bucket_tags = {}
for bucket in tag_bar(allbuckets['Buckets']):
try:
bucket_tagging = s3client.get_bucket_tagging(Bucket=bucket['Name'])
bucket_tags[bucket['Name']] = bucket_tagging['TagSet']
except botocore.exceptions.ClientError:
bucket_tags[bucket['Name']] = []
# Look up the storage for each storage type from CloudWatch
print('Calculating storage usage...')
bucket_info = []
start_time = now - datetime.timedelta(days=2)
end_time = now - datetime.timedelta(days=1)
main_bar = progressbar.ProgressBar()
bucket_storages = list(itertools.product(allbuckets['Buckets'], STORAGE_TYPES))
for bucket, storage_type in main_bar(bucket_storages):
metrics = cw.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': bucket['Name']},
{'Name': 'StorageType', 'Value': storage_type}
],
Statistics=['Average'],
Period=3600,
StartTime=start_time.isoformat(),
EndTime=end_time.isoformat()
)
# The cloudwatch metrics will have the single datapoint, so we just report on it.
for item in metrics["Datapoints"]:
size_bytes = int(item['Average'])
size_gb = size_bytes/1024/1024/1024
bucket_info.append({
'name': '{} ({})'.format(bucket['Name'], storage_type),
'size': size_gb,
'tags': bucket_tags[bucket['Name']]
})
# Pretty looking tags
def format_tags(tags):
return ', '.join([
'{}={}'.format(tag['Key'], tag['Value'])
for tag in tags
])
# Header Line for the output going to standard out
format_string = '{: <75} {: >15} {: <100}'
print(format_string.format('Bucket', 'Size (GiB)', 'Tags'))
for bucket in sorted(bucket_info, key=lambda x: x['size'], reverse=True):
print(format_string.format(
bucket['name'],
'{:.2f}'.format(bucket['size']),
format_tags(bucket['tags'])))
@zmokhtar
Copy link

I think the list at the top needs to be updated.

(from https://docs.aws.amazon.com/AmazonS3/latest/userguide/metrics-dimensions.html)

Valid storage-type filters: StandardStorage, IntelligentTieringFAStorage, IntelligentTieringIAStorage, IntelligentTieringAAStorage, IntelligentTieringAIAStorage, IntelligentTieringDAAStorage, StandardIAStorage, StandardIASizeOverhead, StandardIAObjectOverhead, OneZoneIAStorage, OneZoneIASizeOverhead, ReducedRedundancyStorage, GlacierInstantRetrievalSizeOverhead GlacierInstantRetrievalStorage, GlacierStorage, GlacierStagingStorage, GlacierObjectOverhead, GlacierS3ObjectOverhead, DeepArchiveStorage, DeepArchiveObjectOverhead, DeepArchiveS3ObjectOverhead, and DeepArchiveStagingStorage (see the StorageType dimension)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment