Last active
November 25, 2019 05:22
-
-
Save vikas-git/2d060bfb3b93c31a430564164fa81328 to your computer and use it in GitHub Desktop.
Basic operation to push object on AWS S3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
For more information visit official documents | |
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-examples.html | |
''' | |
import boto3 | |
from botocore.errorfactory import ClientError | |
from boto3.s3.transfer import S3Transfer | |
# boto script | |
class BotoScript(object): | |
def __init__(self, bucket, secret_key, access_key): | |
self.bucket = bucket | |
self.client = boto3.client( | |
service_name='s3', | |
aws_access_key_id=access_key, | |
aws_secret_access_key=secret_key, | |
) | |
def get_bucket_list(self): | |
return self.client.list_buckets() | |
def object_exist(self, dir_path): | |
success = True | |
try: | |
self.client.head_object(Bucket=self.bucket, Key=dir_path) | |
except ClientError as e: | |
print(e) | |
success = False | |
return success | |
def create_dir(self, dir_path): | |
try: | |
# if self.object_exist(dir_path): | |
self.client.put_object(Bucket=self.bucket, Key=dir_path) | |
except ClientError as e: | |
print(e) | |
success = False | |
return True | |
def upload_file(self, file_obj, obj_name=None): | |
success = True | |
uri = '' | |
try: | |
# If S3 object_name was not specified, use file_name | |
if obj_name is None: | |
obj_name = file_obj | |
try: | |
transfer = S3Transfer(self.client) | |
transfer.upload_file(file_obj, self.bucket, obj_name) | |
uri = self.generate_presigned_url(obj_name) | |
except ClientError as e: | |
print(e) | |
success = False | |
except Exception as e: | |
print(e) | |
success = False | |
return success, uri | |
def generate_presigned_url(self, object_name, bucket_name=None, expiration=3600): | |
try: | |
if bucket_name is None: | |
bucket_name = self.bucket | |
response = self.client.generate_presigned_url( | |
'get_object', | |
Params={'Bucket': bucket_name,'Key': object_name}, | |
ExpiresIn=expiration | |
) | |
except ClientError as e: | |
print(e) | |
return None | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment