Last active
June 12, 2020 18:34
-
-
Save Ge0rg3/7a25c5bd16f144fc608f999bc195b31c to your computer and use it in GitHub Desktop.
s3scanner.py, but configured for use as a sub-program. For example, all data is sent to stdout, coloured logging is disabled and everything is compiled to one file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
This copy of s3scanner has been condensed from two files -> one, and logging has been changed to printing. | |
This way, it can be used as a sub-program instead of a main program | |
You can find the original s3scanner script at https://github.com/sa7mon/S3Scanner | |
The file is split into three components: imports, file 1 (s3utils.py) and file 2 (s3scanner.py) | |
""" | |
# Ensure everything is flushed immediately | |
import builtins | |
def print(*args, **kwargs): | |
kwargs["flush"] = kwargs.get("flush", True) | |
builtins.print(*args, **kwargs) | |
""" | |
Part one: imports | |
""" | |
import argparse | |
import os | |
import sys | |
import re | |
import signal | |
from contextlib import contextmanager | |
import datetime | |
import boto3 | |
from botocore.exceptions import ClientError, NoCredentialsError, HTTPClientError | |
from botocore.handlers import disable_signing | |
from botocore import UNSIGNED | |
from botocore.client import Config | |
import requests | |
""" | |
Part two: s3utils.py | |
""" | |
SIZE_CHECK_TIMEOUT = 30 # How long to wait for getBucketSize to return | |
AWS_CREDS_CONFIGURED = True | |
ERROR_CODES = ['AccessDenied', 'AllAccessDisabled', '[Errno 21] Is a directory:'] | |
class TimeoutException(Exception): pass | |
@contextmanager | |
def time_limit(seconds): | |
def signal_handler(signum, frame): | |
raise TimeoutException("Timed out!") | |
signal.signal(signal.SIGALRM, signal_handler) | |
signal.alarm(seconds) | |
try: | |
yield | |
finally: | |
signal.alarm(0) | |
def checkAcl(bucket): | |
""" | |
Attempts to retrieve a bucket's ACL. This also functions as the main 'check if bucket exists' function. | |
By trying to get the ACL, we combine 2 steps to minimize potentially slow network calls. | |
:param bucket: Name of bucket to try to get the ACL of | |
:return: A dictionary with 2 entries: | |
found - Boolean. True/False whether or not the bucket was found | |
acls - dictionary. If ACL was retrieved, contains 2 keys: 'allUsers' and 'authUsers'. If ACL was not | |
retrieved, | |
""" | |
allUsersGrants = [] | |
authUsersGrants = [] | |
s3 = boto3.resource('s3') | |
try: | |
bucket_acl = s3.BucketAcl(bucket) | |
bucket_acl.load() | |
except s3.meta.client.exceptions.NoSuchBucket: | |
return {"found": False, "acls": {}} | |
except ClientError as e: | |
if e.response['Error']['Code'] == "AccessDenied": | |
return {"found": True, "acls": "AccessDenied"} | |
elif e.response['Error']['Code'] == "AllAccessDisabled": | |
return {"found": True, "acls": "AllAccessDisabled"} | |
else: | |
raise e | |
for grant in bucket_acl.grants: | |
if 'URI' in grant['Grantee']: | |
if grant['Grantee']['URI'] == "http://acs.amazonaws.com/groups/global/AllUsers": | |
allUsersGrants.append(grant['Permission']) | |
elif grant['Grantee']['URI'] == "http://acs.amazonaws.com/groups/global/AuthenticatedUsers": | |
authUsersGrants.append(grant['Permission']) | |
return {"found": True, "acls": {"allUsers": allUsersGrants, "authUsers": authUsersGrants}} | |
def checkAwsCreds(): | |
""" | |
Checks to see if the user has credentials for AWS properly configured. | |
This is essentially a requirement for getting accurate results. | |
:return: True if AWS credentials are properly configured. False if not. | |
""" | |
sts = boto3.client('sts') | |
try: | |
response = sts.get_caller_identity() | |
except NoCredentialsError as e: | |
return False | |
return True | |
def checkBucket(inBucket, argsDump, argsList): | |
# Determine what kind of input we're given. Options: | |
# bucket name i.e. mybucket | |
# domain name i.e. flaws.cloud | |
# full S3 url i.e. flaws.cloud.s3-us-west-2.amazonaws.com | |
# bucket:region i.e. flaws.cloud:us-west-2 | |
if ".amazonaws.com" in inBucket: # We were given a full s3 url | |
bucket = inBucket[:inBucket.rfind(".s3")] | |
elif ":" in inBucket: # We were given a bucket in 'bucket:region' format | |
bucket = inBucket.split(":")[0] | |
else: # We were either given a bucket name or domain name | |
bucket = inBucket | |
valid = checkBucketName(bucket) | |
if not valid: | |
message = "{0:>11} : {1}".format("[invalid]", bucket) | |
print(message) | |
# continue | |
return | |
if AWS_CREDS_CONFIGURED: | |
b = checkAcl(bucket) | |
else: | |
a = checkBucketWithoutCreds(bucket) | |
b = {"found": a, "acls": "unknown - no aws creds"} | |
if b["found"]: | |
size = getBucketSize(bucket) # Try to get the size of the bucket | |
message = "{0:>11} : {1}".format("[found]", bucket + " | " + str(size) + " | ACLs: " + str(b["acls"])) | |
print(message) | |
if argsDump: | |
if size not in ["AccessDenied", "AllAccessDisabled"]: | |
print("{0:>11} : {1} - {2}".format("[found]", bucket, "Attempting to dump...this may take a while.")) | |
dumpBucket(bucket) | |
if argsList: | |
if str(b["acls"]) not in ["AccessDenied", "AllAccessDisabled"]: | |
listBucket(bucket) | |
else: | |
message = "{0:>11} : {1}".format("[not found]", bucket) | |
print(message) | |
def checkBucketName(bucket_name): | |
""" Checks to make sure bucket names input are valid according to S3 naming conventions | |
:param bucketName: Name of bucket to check | |
:return: Boolean - whether or not the name is valid | |
""" | |
# Bucket names can be 3-63 (inclusively) characters long. | |
# Bucket names may only contain lowercase letters, numbers, periods, and hyphens | |
pattern = r'(?=^.{3,63}$)(?!^(\d+\.)+\d+$)(^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])$)' | |
return bool(re.match(pattern, bucket_name)) | |
def checkBucketWithoutCreds(bucketName, triesLeft=2): | |
""" Does a simple GET request with the Requests library and interprets the results. | |
bucketName - A domain name without protocol (http[s]) """ | |
if triesLeft == 0: | |
return False | |
bucketUrl = 'http://' + bucketName + '.s3.amazonaws.com' | |
r = requests.head(bucketUrl) | |
if r.status_code == 200: # Successfully found a bucket! | |
return True | |
elif r.status_code == 403: # Bucket exists, but we're not allowed to LIST it. | |
return True | |
elif r.status_code == 404: # This is definitely not a valid bucket name. | |
return False | |
elif r.status_code == 503: | |
return checkBucketWithoutCreds(bucketName, triesLeft - 1) | |
else: | |
raise ValueError("Got an unhandled status code back: " + str(r.status_code) + " for bucket: " + bucketName + | |
". Please open an issue at: https://github.com/sa7mon/s3scanner/issues and include this info.") | |
def dumpBucket(bucketName): | |
global dumped | |
# Dump the bucket into bucket folder | |
bucketDir = './buckets/' + bucketName | |
if not os.path.exists(bucketDir): | |
os.makedirs(bucketDir) | |
dumped = True | |
s3 = boto3.client('s3') | |
try: | |
if AWS_CREDS_CONFIGURED is False: | |
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) | |
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=bucketName): | |
for item in page['Contents']: | |
key = item['Key'] | |
s3.download_file(bucketName, key, bucketDir+"/"+key) | |
dumped = True | |
except ClientError as e: | |
# global dumped | |
if e.response['Error']['Code'] == 'AccessDenied': | |
pass # TODO: Do something with the fact that we were denied | |
dumped = False | |
finally: | |
# Check if folder is empty. If it is, delete it | |
if not os.listdir(bucketDir): | |
os.rmdir(bucketDir) | |
return dumped | |
def getBucketSize(bucketName): | |
""" | |
Use awscli to 'ls' the bucket which will give us the total size of the bucket. | |
NOTE: | |
Function assumes the bucket exists and doesn't catch errors if it doesn't. | |
""" | |
s3 = boto3.client('s3') | |
try: | |
if AWS_CREDS_CONFIGURED is False: | |
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) | |
size_bytes = 0 | |
with time_limit(SIZE_CHECK_TIMEOUT): | |
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=bucketName): | |
for item in page['Contents']: | |
size_bytes += item['Size'] | |
return str(size_bytes) + " bytes" | |
except HTTPClientError as e: | |
if "Timed out!" in str(e): | |
return "Unknown Size - timeout" | |
else: | |
raise e | |
except ClientError as e: | |
if e.response['Error']['Code'] == 'AccessDenied': | |
return "AccessDenied" | |
elif e.response['Error']['Code'] == 'AllAccessDisabled': | |
return "AllAccessDisabled" | |
elif e.response['Error']['Code'] == 'NoSuchBucket': | |
return "NoSuchBucket" | |
else: | |
raise e | |
def listBucket(bucketName): | |
""" | |
If we find an open bucket, save the contents of the bucket listing to file. | |
Returns: | |
None if no errors were encountered | |
""" | |
# Dump the bucket into bucket folder | |
bucketDir = './list-buckets/' + bucketName + '.txt' | |
if not os.path.exists('./list-buckets/'): | |
os.makedirs('./list-buckets/') | |
s3 = boto3.client('s3') | |
objects = [] | |
try: | |
if AWS_CREDS_CONFIGURED is False: | |
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) | |
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=bucketName): | |
if 'Contents' in page: | |
for item in page['Contents']: | |
o = item['LastModified'].strftime('%Y-%m-%d %H:%M:%S') + " " + str(item['Size']) + " " + item['Key'] | |
objects.append(o) | |
with open(bucketDir, 'w') as f: | |
for o in objects: | |
f.write(o + "\n") | |
except ClientError as e: | |
if e.response['Error']['Code'] == 'AccessDenied': | |
return "AccessDenied" | |
else: | |
raise e | |
""" | |
Part 3: s3scanner.py | |
""" | |
######### | |
# | |
# AWS S3scanner - Scans domain names for S3 buckets | |
# | |
# Author: Dan Salmon (twitter.com/bltjetpack, github.com/sa7mon) | |
# Created: 6/19/17 | |
# License: Creative Commons (CC BY-NC-SA 4.0)) | |
# | |
######### | |
CURRENT_VERSION = '1.0.0' | |
# We want to use both formatter classes, so a custom class it is | |
class CustomFormatter(argparse.RawTextHelpFormatter, argparse.RawDescriptionHelpFormatter): | |
pass | |
# Instantiate the parser | |
parser = argparse.ArgumentParser(description='# s3scanner - Find S3 buckets and dump!\n' | |
'#\n' | |
'# Author: Dan Salmon - @bltjetpack, github.com/sa7mon\n', | |
prog='s3scanner', formatter_class=CustomFormatter) | |
# Declare arguments | |
parser.add_argument('-o', '--out-file', dest='outFile', default='./buckets.txt', | |
help='Name of file to save the successfully checked buckets in (Default: buckets.txt)') | |
# parser.add_argument('-c', '--include-closed', dest='includeClosed', action='store_true', default=False, | |
# help='Include found but closed buckets in the out-file') | |
parser.add_argument('-d', '--dump', dest='dump', action='store_true', default=False, | |
help='Dump all found open buckets locally') | |
parser.add_argument('-l', '--list', dest='list', action='store_true', | |
help='Save bucket file listing to local file: ./list-buckets/${bucket}.txt') | |
parser.add_argument('--version', action='version', version=CURRENT_VERSION, | |
help='Display the current version of this tool') | |
parser.add_argument('buckets', help='Name of text file containing buckets to check') | |
# Parse the args | |
args = parser.parse_args() | |
if not checkAwsCreds(): | |
AWS_CREDS_CONFIGURED = False | |
print("Warning: AWS credentials not configured. Open buckets will be shown as closed. Run:" | |
" `aws configure` to fix this.\n") | |
if os.path.isfile(args.buckets): | |
with open(args.buckets, 'r') as f: | |
for line in f: | |
line = line.rstrip() # Remove any extra whitespace | |
checkBucket(line, args.dump, args.list) | |
else: | |
# It's a single bucket | |
checkBucket(args.buckets, args.dump, args.list) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment