Skip to content

Instantly share code, notes, and snippets.

@Babatunde13
Last active July 11, 2024 16:28
Show Gist options
  • Save Babatunde13/72a742c40e089debb72126e88837d780 to your computer and use it in GitHub Desktop.
Save Babatunde13/72a742c40e089debb72126e88837d780 to your computer and use it in GitHub Desktop.
import boto3, os
class PyS3:
def __init__(self, ACCESS_KEY, SECRET_KEY):
self.s3 = boto3.resource('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)
def allow_or_prevent_public_access(self, bucket_name, secure=False):
try:
self.s3.meta.client.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': secure,
'IgnorePublicAcls': secure,
'BlockPublicPolicy': secure,
'RestrictPublicBuckets': secure
}
)
except Exception as e:
print(e)
def create_bucket(self, bucket_name, secure=False):
try:
self.s3.create_bucket(Bucket=bucket_name)
if secure:
allow_or_prevent_public_access(self.s3, bucket_name, secure)
except Exception as e:
print(e)
def upload_file(self, bucket_name, file_name, directory, url, type='file'):
remote_path = ''
if type == 'file':
# check that file exists
full_path = os.path.join(directory, file_name)
file_exists = os.path.exists(full_path)
remote_path = file_name
if not file_exists:
print('File does not exist')
return
elif type == 'url':
remote_path = file_name
try:
self.s3.Bucket(bucket_name).upload_file(full_path, remote_path)
except Exception as e:
print(e)
def download_file(self, bucket_name, file_name, directory, local_name):
# check if directory exists
if not os.path.exists(directory):
print('Directory does not exist, creating it')
os.makedirs(directory)
try:
self.s3.Bucket(bucket_name).download_file(file_name, os.path.join(directory, local_name))
except Exception as e:
print(e)
def delete_files(self, bucket_name, files):
objects_to_delete = map(lambda key: { 'Key': key }, files)
try:
self.s3.Bucket(bucket_name).delete_objects(Delete={ 'Objects': objects_to_delete })
except Exception as e:
print(e)
def delete_bucket(self, bucket_name):
try:
self.s3.Bucket(bucket_name).delete()
except Exception as e:
print(e)
def list_files(self, bucket_name):
files = []
try:
for obj in self.s3.Bucket(bucket_name).objects.all():
files.append(obj.key)
except Exception as e:
print(e)
return files
def list_buckets(self):
buckets = []
try:
for bucket in self.s3.buckets.all():
buckets.append(bucket.name)
except Exception as e:
print(e)
return buckets
def get_file_metadata(self, bucket_name, file_name):
metadata = {}
try:
obj = self.s3.Object(bucket_name, file_name)
metadata = obj.metadata
except Exception as e:
print(e)
return metadata
def copy_file(self, bucket_name, file_name, new_bucket_name):
try:
self.s3.Bucket(new_bucket_name).copy({'Bucket': bucket_name, 'Key': file_name}, file_name)
except Exception as e:
print(e)
def sync_buckets(self, bucket_name, transient_bucket_name):
try:
self.s3.meta.client.copy_object(CopySource={'Bucket': bucket_name, 'Key': ''}, Bucket=transient_bucket_name, Key='')
except Exception as e:
print(e)
def set_bucket_public_access(self, bucket_name, access):
if access not in ['block', 'unblock']:
print('Invalid access type')
return
try:
if access == 'block':
allow_or_prevent_public_access(self.s3, bucket_name, True)
else:
allow_or_prevent_public_access(self.s3, bucket_name, False)
except Exception as e:
print(e)
def generate_presigned_url(self, bucket_name, file_name, expiration=3600):
try:
url = self.s3.meta.client.generate_presigned_url(
'get_object',
Params={'Bucket': bucket_name, 'Key': file_name},
ExpiresIn=expiration
)
print(url)
return url
except Exception as e:
print(e)
def main():
ACCESS_KEY = input('Enter your access key: ')
SECRET_KEY = input('Enter your secret key: ')
s3 = PyS3(ACCESS_KEY, SECRET_KEY)
# create a bucket
BUCKET_NAME = input('Enter the name of the bucket you want to create: ')
s3.create_bucket(BUCKET_NAME)
# upload a file
file_name = input('Enter the name of the file you want to upload: ')
directory = input('Enter the directory of the file you want to upload: ')
s3.upload_file(BUCKET_NAME, file_name, directory, '', 'file')
# upload a file from a URL
file_name = input('Enter the name of the file you want to upload from a URL: ')
url = input('Enter the URL of the file you want to upload: ')
s3.upload_file(BUCKET_NAME, file_name, '', url, 'url')
# download a file
file_name = input('Enter the name of the file you want to download: ')
directory = input('Enter the directory where you want to download the file: ')
local_name = input('Enter the name of the file you want to save locally: ')
s3.download_file(BUCKET_NAME, file_name, directory, local_name)
# list files in a bucket
files = s3.list_files(BUCKET_NAME)
print(files)
# list buckets
buckets = s3.list_buckets()
print(buckets)
# get file metadata
file_name = input('Enter the name of the file you want to get metadata for: ')
metadata = s3.set_file_metadata(BUCKET_NAME, file_name)
# sync buckets
TRANSIENT_BUCKET_NAME = input('Enter the name of the transient bucket: ')
s3.sync_buckets(BUCKET_NAME, TRANSIENT_BUCKET_NAME)
# delete files
files = input('Enter the files you want to delete separated by commas: ')
files = files.split(',')
s3.delete_files(BUCKET_NAME, files)
# delete a bucket
s3.delete_bucket(input('Enter the name of the bucket you want to delete: '))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment