Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save c51303/2613b856467dfa92c4fb15189d35ff84 to your computer and use it in GitHub Desktop.
Save c51303/2613b856467dfa92c4fb15189d35ff84 to your computer and use it in GitHub Desktop.
How to delete a folder or files recursively from an S3 bucket?

递归删除指定路径下所有对象,包括子目录下的对象

import os
import sys
import boto3
from collections import deque
import multiprocessing



class S3Ops:
    ...

    # 列出当前路径下的所有对象
    def listObjects(self,obj:str,maxkeys:int=1000):
        return self.s3.list_objects(Bucket=self.bucket,Prefix=obj,Delimiter='/',MaxKeys=maxkeys)

    # 列出当前路径下的所有子目录
    def listSubFolders(self, obj:str):
        subfolders = []
        resp = self.listObjects(obj,maxkeys=10)
        if resp and resp['ResponseMetadata']['HTTPStatusCode']==200:
            commonprefixes = resp.get('CommonPrefixes', [])
            if commonprefixes:
                for commonprefix in commonprefixes:
                    subfolders.append(commonprefix['Prefix'])
        return subfolders


def pwork(objs_prefix: list):
    s3c = S3Ops()

    # 通过路径队列deque依次找出所有子目录,添加进deque, 同时记录到 subdirs 子目录列表。
    objs_prefix_mq = deque(objs_prefix)
    subdirs = []    # 存放子目录列表
    while(len(objs_prefix_mq)>0):
        elem = objs_prefix_mq.popleft()
        subdirs.append(elem)
        resp_subfolders = s3c.listSubFolders(elem)
        if len(resp_subfolders) > 0:
           cpfmq.extend(resp_subfolders)


    for subdir in subdirs:
        while True:         # 解决删除时maxkeys的限制,循环删除直到404未找到文件
            contents = []
            dele_objs = []
            resp_list = s3c.listObjects(subdir)
            if resp_list and resp_list['ResponseMetadata']['HTTPStatusCode']==200:
                contents = resp_list.get('Contents', [])
                if contents:
                    for content in contents:
                        dele_objs.append(content['Key'])
                    len_dele_objs = len(dele_objs)
                    print(f"> \033[32m{subdir}: 200, Records: {len_dele_objs}\033[00m",end=', ')
                    resp_del = s3c.deleteObjects(dele_objs)
                    deleted = resp_del.get('Deleted',[])
                    print('Deleted: ' + str(len(deleted)))
                else:
                    print(f"> {subdir}: 404, Records: {len(contents)}")
                    break
            else:
                print(resp_list)


# 把过大的列表切割成指定大小(chunk_size)的子列表,并提交到不同的进程并发处理。
# objs_prefix 根据s3中实际数据落地格式生成的列表
chunk_size = 10    
chunks = [objs_prefix[i:i + chunk_size] for i in range(0, len(objs_prefix), chunk_size)]

with ProcessPoolExecutor(max_workers = multiprocessing.cpu_count()) as exec:
        for chunk in chunks:
            futures = exec.submit(pwork, chunk)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment