Skip to content

Instantly share code, notes, and snippets.

@bluesmilery
Created March 17, 2020 03:58
Show Gist options
  • Save bluesmilery/dbf433f53258517fa536cdd53d53147b to your computer and use it in GitHub Desktop.
Save bluesmilery/dbf433f53258517fa536cdd53d53147b to your computer and use it in GitHub Desktop.
[s3_python_client] #s3
# -*- coding: utf-8 -*-
# ===============================================================
#
# @Create Author : bluesmilery
# @Create Time : 2019/11/25 15:51
# @Description : S3客户端
#
# ===============================================================
import hashlib
import os
import os.path as osp
from io import BytesIO
import boto3
from botocore.exceptions import BotoCoreError
from botocore.exceptions import ClientError
class S3Client(object):
def __init__(self, endpoint, ak, sk):
self._s3 = boto3.client('s3', endpoint_url=endpoint,
aws_access_key_id=ak, aws_secret_access_key=sk)
def bucket_exist(self, bucket_name) -> (bool, str):
"""
查询指定bucket是否存在
Args:
bucket_name (str): bucket名称
Returns:
若存在,则返回True及空字符串;否则返回False及错误原因
"""
try:
self._s3.head_bucket(Bucket=bucket_name)
except (BotoCoreError, ClientError) as e:
return False, str(e)
return True, ''
def object_exist(self, bucket_name, object_name) -> (bool, str):
"""
查询指定bucket中的指定对象是否存在
Args:
bucket_name (str): bucket名称
object_name (str): 对象名称
Returns:
若存在,则返回True及空字符串;否则返回False及错误原因
"""
try:
self._s3.head_object(Bucket=bucket_name, Key=object_name)
except (BotoCoreError, ClientError) as e:
return False, str(e)
return True, ''
def upload_object(self, bucket_name, object_name, object_data, force=False) -> (bool, str):
"""
上传数据至指定bucket中的指定对象。如果遇到数据大小大于bucket单次请求大小上限,请使用upload_file进行上传
Args:
bucket_name (str): bucket名称
object_name (str): 对象名称
object_data (bytes): 需上传的数据,二进制格式
force (bool): 如果对象已存在是否强制上传
Returns:
若上传成功,则返回True及空字符串;否则返回False及错误原因
"""
is_exist, info = self.object_exist(bucket_name, object_name)
if (is_exist) and (not force):
return False, '{} already existed'.format(object_name)
if not isinstance(object_data, bytes):
return False, 'The type of object_data should be bytes.'
md5 = hashlib.md5(object_data).hexdigest()
try:
self._s3.put_object(Bucket=bucket_name, Key=object_name, Body=object_data, Metadata={'Md5': md5})
except (BotoCoreError, ClientError) as e:
return False, str(e)
return True, ''
def upload_file(self, bucket_name, object_name, file_path, force=False) -> (bool, str):
"""
上传本地文件中的数据至指定bucket中的指定对象
Args:
bucket_name (str): bucket名称
object_name (str): 对象名称
file_path (str): 需要上传的本地文件的路径
force (bool): 如果对象已存在是否强制上传
Returns:
若上传成功,则返回True及空字符串;否则返回False及错误原因
"""
is_exist, info = self.object_exist(bucket_name, object_name)
if (is_exist) and (not force):
return False, '{} already existed'.format(object_name)
if not osp.exists(file_path):
return False, 'No such file or directory: {}'.format(file_path)
m = hashlib.md5()
with open(file_path, 'rb') as f:
while True:
data = f.read(4096)
if not data:
break
m.update(data)
md5 = m.hexdigest()
try:
self._s3.upload_file(Filename=file_path, Bucket=bucket_name, Key=object_name,
ExtraArgs={'Metadata': {'Md5': md5}})
except (BotoCoreError, ClientError) as e:
return False, str(e)
return True, ''
def download_object(self, bucket_name, object_name) -> (bool, str, bytes):
"""
下载指定bucket中的指定对象
Args:
bucket_name (str): bucket名称
object_name (str): 对象名称
Returns:
若下载成功,则返回True、空字符串及所下载对象的二进制数据
否则返回False、错误原因及空二进制数据
"""
is_exist, info = self.object_exist(bucket_name, object_name)
if not is_exist:
return False, '{} do not exist'.format(object_name), b''
try:
with BytesIO() as f:
self._s3.download_fileobj(Bucket=bucket_name, Key=object_name, Fileobj=f)
content = f.getvalue()
except (BotoCoreError, ClientError) as e:
return False, str(e), b''
return True, '', content
def download_object_to_file(self, bucket_name, object_name, file_name, force=False) -> (bool, str):
"""
下载指定bucket中的指定对象至本地文件
Args:
bucket_name (str): bucket名称
object_name (str): 对象名称
file_path (str): 数据保存的本地文件的路径
Returns:
若下载成功,则返回True及空字符串;否则返回False及错误原因
"""
is_exist, info = self.object_exist(bucket_name, object_name)
if not is_exist:
return False, '{} do not exist'.format(object_name)
if osp.exists(file_name):
if force:
os.remove(file_name)
else:
return False, '{} already existed'.format(file_name)
if '/' in file_name:
if not osp.exists(osp.dirname(file_name)):
return False, 'No such file or directory: {}'.format(osp.dirname(file_name))
try:
self._s3.download_file(Bucket=bucket_name, Key=object_name, Filename=file_name)
except (BotoCoreError, ClientError) as e:
return False, str(e)
return True, ''
def delete_object(self, bucket_name, object_name) -> (bool, str):
"""
删除指定bucket中的指定对象
Args:
bucket_name (str): bucket名称
object_name (str): 对象名称
Returns:
若删除成功,则返回True及空字符串;否则返回False及错误原因
"""
is_exist, info = self.object_exist(bucket_name, object_name)
if not is_exist:
return False, '{} do not exist'.format(object_name)
try:
self._s3.delete_object(Bucket=bucket_name, Key=object_name)
except (BotoCoreError, ClientError) as e:
return False, str(e)
return True, ''
def list_buckets(self) -> list:
"""
获取该ak/sk自有的bucket名称列表
注:请求S3的返回信息中,bucket具有如下属性
{'Name': 'somebucket',
'CreationDate': datetime.datetime(2019, 10, 14, 2, 47, 15, tzinfo=tzutc())}
Returns:
返回含有bucket名称的list
"""
response = self._s3.list_buckets()
buckets = []
if response.get('Buckets'):
for bucket in response['Buckets']:
buckets.append(bucket['Name'])
return buckets
def list_objects(self, bucket_name) -> list:
"""
获取指定bucket下面所有对象的名称列表
注:请求S3的返回信息中,object具有如下属性
{'Key': 'something.jpg',
'LastModified': datetime.datetime(2018, 5, 2, 15, 3, 13, tzinfo=tzutc()),
'ETag': '"6b2fe7a75d371d00810b163657c224ae"',
'Size': 1674136,
'StorageClass': 'STANDARD',
'Owner': {'DisplayName': '1254236265', 'ID': '1254236265'}}
Args:
bucket_name (str): bucket名称
Returns:
返回含有对象名称的list
"""
response = self._s3.list_objects(Bucket=bucket_name)
objects = []
if response.get('Contents'):
for i in response['Contents']:
objects.append(i['Key'])
return objects
def get_md5(self, bucket_name, object_name):
"""
获取指定bucket中的指定对象的md5值。该值首先会从metadata中取,若没有则取etag
Args:
bucket_name (str): bucket名称
object_name (str): 对象名称
Returns:
若获取成功,则返回对象的md5值及空字符串;否则返回None及错误原因
"""
try:
res = self._s3.head_object(Bucket=bucket_name, Key=object_name)
except (BotoCoreError, ClientError) as e:
return None, str(e)
md5 = res.get('Metadata', {}).get('Md5', None)
if md5 is None:
md5 = res.get('ETag', None)
if md5 is not None:
md5 = md5[1:-1]
return md5, ''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment