Last active
July 11, 2024 16:28
-
-
Save Babatunde13/72a742c40e089debb72126e88837d780 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3, os | |
class PyS3: | |
def __init__(self, ACCESS_KEY, SECRET_KEY): | |
self.s3 = boto3.resource('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY) | |
def allow_or_prevent_public_access(self, bucket_name, secure=False): | |
try: | |
self.s3.meta.client.put_public_access_block( | |
Bucket=bucket_name, | |
PublicAccessBlockConfiguration={ | |
'BlockPublicAcls': secure, | |
'IgnorePublicAcls': secure, | |
'BlockPublicPolicy': secure, | |
'RestrictPublicBuckets': secure | |
} | |
) | |
except Exception as e: | |
print(e) | |
def create_bucket(self, bucket_name, secure=False): | |
try: | |
self.s3.create_bucket(Bucket=bucket_name) | |
if secure: | |
allow_or_prevent_public_access(self.s3, bucket_name, secure) | |
except Exception as e: | |
print(e) | |
def upload_file(self, bucket_name, file_name, directory, url, type='file'): | |
remote_path = '' | |
if type == 'file': | |
# check that file exists | |
full_path = os.path.join(directory, file_name) | |
file_exists = os.path.exists(full_path) | |
remote_path = file_name | |
if not file_exists: | |
print('File does not exist') | |
return | |
elif type == 'url': | |
remote_path = file_name | |
try: | |
self.s3.Bucket(bucket_name).upload_file(full_path, remote_path) | |
except Exception as e: | |
print(e) | |
def download_file(self, bucket_name, file_name, directory, local_name): | |
# check if directory exists | |
if not os.path.exists(directory): | |
print('Directory does not exist, creating it') | |
os.makedirs(directory) | |
try: | |
self.s3.Bucket(bucket_name).download_file(file_name, os.path.join(directory, local_name)) | |
except Exception as e: | |
print(e) | |
def delete_files(self, bucket_name, files): | |
objects_to_delete = map(lambda key: { 'Key': key }, files) | |
try: | |
self.s3.Bucket(bucket_name).delete_objects(Delete={ 'Objects': objects_to_delete }) | |
except Exception as e: | |
print(e) | |
def delete_bucket(self, bucket_name): | |
try: | |
self.s3.Bucket(bucket_name).delete() | |
except Exception as e: | |
print(e) | |
def list_files(self, bucket_name): | |
files = [] | |
try: | |
for obj in self.s3.Bucket(bucket_name).objects.all(): | |
files.append(obj.key) | |
except Exception as e: | |
print(e) | |
return files | |
def list_buckets(self): | |
buckets = [] | |
try: | |
for bucket in self.s3.buckets.all(): | |
buckets.append(bucket.name) | |
except Exception as e: | |
print(e) | |
return buckets | |
def get_file_metadata(self, bucket_name, file_name): | |
metadata = {} | |
try: | |
obj = self.s3.Object(bucket_name, file_name) | |
metadata = obj.metadata | |
except Exception as e: | |
print(e) | |
return metadata | |
def copy_file(self, bucket_name, file_name, new_bucket_name): | |
try: | |
self.s3.Bucket(new_bucket_name).copy({'Bucket': bucket_name, 'Key': file_name}, file_name) | |
except Exception as e: | |
print(e) | |
def sync_buckets(self, bucket_name, transient_bucket_name): | |
try: | |
self.s3.meta.client.copy_object(CopySource={'Bucket': bucket_name, 'Key': ''}, Bucket=transient_bucket_name, Key='') | |
except Exception as e: | |
print(e) | |
def set_bucket_public_access(self, bucket_name, access): | |
if access not in ['block', 'unblock']: | |
print('Invalid access type') | |
return | |
try: | |
if access == 'block': | |
allow_or_prevent_public_access(self.s3, bucket_name, True) | |
else: | |
allow_or_prevent_public_access(self.s3, bucket_name, False) | |
except Exception as e: | |
print(e) | |
def generate_presigned_url(self, bucket_name, file_name, expiration=3600): | |
try: | |
url = self.s3.meta.client.generate_presigned_url( | |
'get_object', | |
Params={'Bucket': bucket_name, 'Key': file_name}, | |
ExpiresIn=expiration | |
) | |
print(url) | |
return url | |
except Exception as e: | |
print(e) | |
def main(): | |
ACCESS_KEY = input('Enter your access key: ') | |
SECRET_KEY = input('Enter your secret key: ') | |
s3 = PyS3(ACCESS_KEY, SECRET_KEY) | |
# create a bucket | |
BUCKET_NAME = input('Enter the name of the bucket you want to create: ') | |
s3.create_bucket(BUCKET_NAME) | |
# upload a file | |
file_name = input('Enter the name of the file you want to upload: ') | |
directory = input('Enter the directory of the file you want to upload: ') | |
s3.upload_file(BUCKET_NAME, file_name, directory, '', 'file') | |
# upload a file from a URL | |
file_name = input('Enter the name of the file you want to upload from a URL: ') | |
url = input('Enter the URL of the file you want to upload: ') | |
s3.upload_file(BUCKET_NAME, file_name, '', url, 'url') | |
# download a file | |
file_name = input('Enter the name of the file you want to download: ') | |
directory = input('Enter the directory where you want to download the file: ') | |
local_name = input('Enter the name of the file you want to save locally: ') | |
s3.download_file(BUCKET_NAME, file_name, directory, local_name) | |
# list files in a bucket | |
files = s3.list_files(BUCKET_NAME) | |
print(files) | |
# list buckets | |
buckets = s3.list_buckets() | |
print(buckets) | |
# get file metadata | |
file_name = input('Enter the name of the file you want to get metadata for: ') | |
metadata = s3.set_file_metadata(BUCKET_NAME, file_name) | |
# sync buckets | |
TRANSIENT_BUCKET_NAME = input('Enter the name of the transient bucket: ') | |
s3.sync_buckets(BUCKET_NAME, TRANSIENT_BUCKET_NAME) | |
# delete files | |
files = input('Enter the files you want to delete separated by commas: ') | |
files = files.split(',') | |
s3.delete_files(BUCKET_NAME, files) | |
# delete a bucket | |
s3.delete_bucket(input('Enter the name of the bucket you want to delete: ')) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment