Created
March 17, 2020 03:58
-
-
Save bluesmilery/dbf433f53258517fa536cdd53d53147b to your computer and use it in GitHub Desktop.
[s3_python_client] #s3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# =============================================================== | |
# | |
# @Create Author : bluesmilery | |
# @Create Time : 2019/11/25 15:51 | |
# @Description : S3客户端 | |
# | |
# =============================================================== | |
import hashlib | |
import os | |
import os.path as osp | |
from io import BytesIO | |
import boto3 | |
from botocore.exceptions import BotoCoreError | |
from botocore.exceptions import ClientError | |
class S3Client(object): | |
def __init__(self, endpoint, ak, sk): | |
self._s3 = boto3.client('s3', endpoint_url=endpoint, | |
aws_access_key_id=ak, aws_secret_access_key=sk) | |
def bucket_exist(self, bucket_name) -> (bool, str): | |
""" | |
查询指定bucket是否存在 | |
Args: | |
bucket_name (str): bucket名称 | |
Returns: | |
若存在,则返回True及空字符串;否则返回False及错误原因 | |
""" | |
try: | |
self._s3.head_bucket(Bucket=bucket_name) | |
except (BotoCoreError, ClientError) as e: | |
return False, str(e) | |
return True, '' | |
def object_exist(self, bucket_name, object_name) -> (bool, str): | |
""" | |
查询指定bucket中的指定对象是否存在 | |
Args: | |
bucket_name (str): bucket名称 | |
object_name (str): 对象名称 | |
Returns: | |
若存在,则返回True及空字符串;否则返回False及错误原因 | |
""" | |
try: | |
self._s3.head_object(Bucket=bucket_name, Key=object_name) | |
except (BotoCoreError, ClientError) as e: | |
return False, str(e) | |
return True, '' | |
def upload_object(self, bucket_name, object_name, object_data, force=False) -> (bool, str): | |
""" | |
上传数据至指定bucket中的指定对象。如果遇到数据大小大于bucket单次请求大小上限,请使用upload_file进行上传 | |
Args: | |
bucket_name (str): bucket名称 | |
object_name (str): 对象名称 | |
object_data (bytes): 需上传的数据,二进制格式 | |
force (bool): 如果对象已存在是否强制上传 | |
Returns: | |
若上传成功,则返回True及空字符串;否则返回False及错误原因 | |
""" | |
is_exist, info = self.object_exist(bucket_name, object_name) | |
if (is_exist) and (not force): | |
return False, '{} already existed'.format(object_name) | |
if not isinstance(object_data, bytes): | |
return False, 'The type of object_data should be bytes.' | |
md5 = hashlib.md5(object_data).hexdigest() | |
try: | |
self._s3.put_object(Bucket=bucket_name, Key=object_name, Body=object_data, Metadata={'Md5': md5}) | |
except (BotoCoreError, ClientError) as e: | |
return False, str(e) | |
return True, '' | |
def upload_file(self, bucket_name, object_name, file_path, force=False) -> (bool, str): | |
""" | |
上传本地文件中的数据至指定bucket中的指定对象 | |
Args: | |
bucket_name (str): bucket名称 | |
object_name (str): 对象名称 | |
file_path (str): 需要上传的本地文件的路径 | |
force (bool): 如果对象已存在是否强制上传 | |
Returns: | |
若上传成功,则返回True及空字符串;否则返回False及错误原因 | |
""" | |
is_exist, info = self.object_exist(bucket_name, object_name) | |
if (is_exist) and (not force): | |
return False, '{} already existed'.format(object_name) | |
if not osp.exists(file_path): | |
return False, 'No such file or directory: {}'.format(file_path) | |
m = hashlib.md5() | |
with open(file_path, 'rb') as f: | |
while True: | |
data = f.read(4096) | |
if not data: | |
break | |
m.update(data) | |
md5 = m.hexdigest() | |
try: | |
self._s3.upload_file(Filename=file_path, Bucket=bucket_name, Key=object_name, | |
ExtraArgs={'Metadata': {'Md5': md5}}) | |
except (BotoCoreError, ClientError) as e: | |
return False, str(e) | |
return True, '' | |
def download_object(self, bucket_name, object_name) -> (bool, str, bytes): | |
""" | |
下载指定bucket中的指定对象 | |
Args: | |
bucket_name (str): bucket名称 | |
object_name (str): 对象名称 | |
Returns: | |
若下载成功,则返回True、空字符串及所下载对象的二进制数据 | |
否则返回False、错误原因及空二进制数据 | |
""" | |
is_exist, info = self.object_exist(bucket_name, object_name) | |
if not is_exist: | |
return False, '{} do not exist'.format(object_name), b'' | |
try: | |
with BytesIO() as f: | |
self._s3.download_fileobj(Bucket=bucket_name, Key=object_name, Fileobj=f) | |
content = f.getvalue() | |
except (BotoCoreError, ClientError) as e: | |
return False, str(e), b'' | |
return True, '', content | |
def download_object_to_file(self, bucket_name, object_name, file_name, force=False) -> (bool, str): | |
""" | |
下载指定bucket中的指定对象至本地文件 | |
Args: | |
bucket_name (str): bucket名称 | |
object_name (str): 对象名称 | |
file_path (str): 数据保存的本地文件的路径 | |
Returns: | |
若下载成功,则返回True及空字符串;否则返回False及错误原因 | |
""" | |
is_exist, info = self.object_exist(bucket_name, object_name) | |
if not is_exist: | |
return False, '{} do not exist'.format(object_name) | |
if osp.exists(file_name): | |
if force: | |
os.remove(file_name) | |
else: | |
return False, '{} already existed'.format(file_name) | |
if '/' in file_name: | |
if not osp.exists(osp.dirname(file_name)): | |
return False, 'No such file or directory: {}'.format(osp.dirname(file_name)) | |
try: | |
self._s3.download_file(Bucket=bucket_name, Key=object_name, Filename=file_name) | |
except (BotoCoreError, ClientError) as e: | |
return False, str(e) | |
return True, '' | |
def delete_object(self, bucket_name, object_name) -> (bool, str): | |
""" | |
删除指定bucket中的指定对象 | |
Args: | |
bucket_name (str): bucket名称 | |
object_name (str): 对象名称 | |
Returns: | |
若删除成功,则返回True及空字符串;否则返回False及错误原因 | |
""" | |
is_exist, info = self.object_exist(bucket_name, object_name) | |
if not is_exist: | |
return False, '{} do not exist'.format(object_name) | |
try: | |
self._s3.delete_object(Bucket=bucket_name, Key=object_name) | |
except (BotoCoreError, ClientError) as e: | |
return False, str(e) | |
return True, '' | |
def list_buckets(self) -> list: | |
""" | |
获取该ak/sk自有的bucket名称列表 | |
注:请求S3的返回信息中,bucket具有如下属性 | |
{'Name': 'somebucket', | |
'CreationDate': datetime.datetime(2019, 10, 14, 2, 47, 15, tzinfo=tzutc())} | |
Returns: | |
返回含有bucket名称的list | |
""" | |
response = self._s3.list_buckets() | |
buckets = [] | |
if response.get('Buckets'): | |
for bucket in response['Buckets']: | |
buckets.append(bucket['Name']) | |
return buckets | |
def list_objects(self, bucket_name) -> list: | |
""" | |
获取指定bucket下面所有对象的名称列表 | |
注:请求S3的返回信息中,object具有如下属性 | |
{'Key': 'something.jpg', | |
'LastModified': datetime.datetime(2018, 5, 2, 15, 3, 13, tzinfo=tzutc()), | |
'ETag': '"6b2fe7a75d371d00810b163657c224ae"', | |
'Size': 1674136, | |
'StorageClass': 'STANDARD', | |
'Owner': {'DisplayName': '1254236265', 'ID': '1254236265'}} | |
Args: | |
bucket_name (str): bucket名称 | |
Returns: | |
返回含有对象名称的list | |
""" | |
response = self._s3.list_objects(Bucket=bucket_name) | |
objects = [] | |
if response.get('Contents'): | |
for i in response['Contents']: | |
objects.append(i['Key']) | |
return objects | |
def get_md5(self, bucket_name, object_name): | |
""" | |
获取指定bucket中的指定对象的md5值。该值首先会从metadata中取,若没有则取etag | |
Args: | |
bucket_name (str): bucket名称 | |
object_name (str): 对象名称 | |
Returns: | |
若获取成功,则返回对象的md5值及空字符串;否则返回None及错误原因 | |
""" | |
try: | |
res = self._s3.head_object(Bucket=bucket_name, Key=object_name) | |
except (BotoCoreError, ClientError) as e: | |
return None, str(e) | |
md5 = res.get('Metadata', {}).get('Md5', None) | |
if md5 is None: | |
md5 = res.get('ETag', None) | |
if md5 is not None: | |
md5 = md5[1:-1] | |
return md5, '' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment