递归删除指定路径下所有对象,包括子目录下的对象
import os
import sys
import boto3
from collections import deque
import multiprocessing
class S3Ops:
...
# 列出当前路径下的所有对象
def listObjects(self,obj:str,maxkeys:int=1000):
return self.s3.list_objects(Bucket=self.bucket,Prefix=obj,Delimiter='/',MaxKeys=maxkeys)
# 列出当前路径下的所有子目录
def listSubFolders(self, obj:str):
subfolders = []
resp = self.listObjects(obj,maxkeys=10)
if resp and resp['ResponseMetadata']['HTTPStatusCode']==200:
commonprefixes = resp.get('CommonPrefixes', [])
if commonprefixes:
for commonprefix in commonprefixes:
subfolders.append(commonprefix['Prefix'])
return subfolders
def pwork(objs_prefix: list):
s3c = S3Ops()
# 通过路径队列deque依次找出所有子目录,添加进deque, 同时记录到 subdirs 子目录列表。
objs_prefix_mq = deque(objs_prefix)
subdirs = [] # 存放子目录列表
while(len(objs_prefix_mq)>0):
elem = objs_prefix_mq.popleft()
subdirs.append(elem)
resp_subfolders = s3c.listSubFolders(elem)
if len(resp_subfolders) > 0:
cpfmq.extend(resp_subfolders)
for subdir in subdirs:
while True: # 解决删除时maxkeys的限制,循环删除直到404未找到文件
contents = []
dele_objs = []
resp_list = s3c.listObjects(subdir)
if resp_list and resp_list['ResponseMetadata']['HTTPStatusCode']==200:
contents = resp_list.get('Contents', [])
if contents:
for content in contents:
dele_objs.append(content['Key'])
len_dele_objs = len(dele_objs)
print(f"> \033[32m{subdir}: 200, Records: {len_dele_objs}\033[00m",end=', ')
resp_del = s3c.deleteObjects(dele_objs)
deleted = resp_del.get('Deleted',[])
print('Deleted: ' + str(len(deleted)))
else:
print(f"> {subdir}: 404, Records: {len(contents)}")
break
else:
print(resp_list)
# 把过大的列表切割成指定大小(chunk_size)的子列表,并提交到不同的进程并发处理。
# objs_prefix 根据s3中实际数据落地格式生成的列表
chunk_size = 10
chunks = [objs_prefix[i:i + chunk_size] for i in range(0, len(objs_prefix), chunk_size)]
with ProcessPoolExecutor(max_workers = multiprocessing.cpu_count()) as exec:
for chunk in chunks:
futures = exec.submit(pwork, chunk)