Last active
October 25, 2024 04:46
-
-
Save michalc/20b79c9028c342ef7c38df8693f8715b to your computer and use it in GitHub Desktop.
Bulk delete files from an AWS S3 bucket in Python using multiple threads via a parallel (eventually) depth-first search
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Deletes objects in bulk using boto3's delete_objects, but using multiple threads to achieve some | |
# parallelism - in spite of the GIL multiple HTTP requests to S3 should happen at the same time. | |
# Instead of looping over all keys under the root prefix, it walks the tree of keys of delimiter- | |
# defined "folders" in a depth-first way, which allows each page to be processed by a separate | |
# thread as its discovered. Depth-first is done because the depth is limited by the maximum key | |
# size of 1024 in S3, and so means that there is a limit to the memory used by the algorithm to | |
# store the next requests to make. This would not be the case with breadth-first because there is | |
# no limit to how many keys are in any folder. | |
# | |
# To do the search in parallel, each bit of work (i.e. an HTTP request to fetch a page of keys | |
# under a prefix) in the tree is assigned a priority, added to a queue, and the work with the | |
# lowest valued priority in the queue is chosen to be done when a worker becomes available. | |
# | |
# The priority of each request is a tuple, where each item is itself a tuple of the index of the | |
# prefix in the page in which it appeared, and the current page number under that prefix. Going | |
# to the next page increments page number, and going "deeper" into the tree appends to the | |
# priority with another tuple. Say for example if we set page_size=2, then an entire tree of | |
# prefixes and priorities could look like the below. | |
# | |
# Prefix Priority | |
# root/ (0,0), | |
# ├─ apples/ (0,0), (0,0), | |
# │ ├─ ash/ (0,0), (0,0), (0,0), | |
# │ ├─ beech/ (0,0), (0,0), (1,0), | |
# │ apples/ (next page) (0,0), (0,1), | |
# │ ├─ cedar/ (0,0), (0,1), (0,0), | |
# │ └─ douglas-fir/ (0,0), (0,1), (1,0), | |
# ├─ bananas/ (0,0), (1,0), | |
# root/ (next page) (0,1), | |
# ├─ cherries/ (0,1), (0,0), | |
# └─ dates/ (0,1), (1,0), | |
# | |
# At any given moment only some of the tree is known, and because nodes are explored in parallel, | |
# at any given moment several requests that correspond to exploring several nodes could be | |
# in-flight, the tree is not even discovered in a deterministic order. However, the priorities are | |
# constructed so that the minimum of the ones in the queue corresponds to the best node to explore | |
# to makes the search as depth-first as possible. As nodes are explored and new priorities are | |
# added to the queue, this minimum can change and even be smaller than already explored or | |
# in-flight nodes. So the search is maybe better described as "eventually depth-first". | |
# | |
# This priority construction also has the property that going to the next page under a prefix does | |
# not increase the length of the priority itself - important when chains of pages are not limited | |
# in their length. The length of the priority does increase with the depth in the tree, but as | |
# mentioned above this is limited due to the limit on the length of keys. Technically Python | |
# requires more memory to store larger integers. But to store the index of page 1,000,000,000 | |
# takes the same memory as to store 1 (28 bytes), and so it can in this case be treated as | |
# constant for all reachable page indexes. | |
# | |
# From some light testing deletes at between 2000 to 6000 objects per second, and works best if | |
# you have objects distributed into folders/CommonPrefixes as specified by the delimiter. | |
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait | |
from dataclasses import dataclass, field | |
from functools import partial | |
from typing import Callable, Optional, Tuple | |
from queue import PriorityQueue | |
import boto3 | |
def bulk_delete( | |
bucket, prefix, | |
workers=8, page_size=1000, delimiter='/', | |
get_s3_client=lambda: boto3.client('s3'), on_delete=lambda num: None, | |
): | |
s3 = get_s3_client() | |
queue = PriorityQueue() | |
@dataclass(order=True) | |
class Work: | |
# A tuple that determines the priority of the bit of work in "func". This is a sort of | |
# "coordinate" in the paginated node tree that prioritises a depth-first search. | |
priority: Tuple[Tuple[int,int], ...] | |
# The work function itself that fetches a page of Key/CommonPrefixes, or deletes | |
func: Optional[Callable[[], None]] = field(compare=False) | |
# A sentinal "stop" Work instance with priority chosen to be before all work. So when it's | |
# queued the workers will stop at the very next opportunity | |
stop = Work(((-1,-1),), None) | |
def do_work(): | |
while (work := queue.get()) is not stop: | |
work.func() | |
queue.task_done() | |
with queue.mutex: | |
unfinished_tasks = queue.unfinished_tasks | |
if unfinished_tasks == 0: | |
for _ in range(0, workers): | |
queue.put(stop) | |
def list_iter(prefix): | |
return iter(s3.get_paginator('list_objects_v2').paginate( | |
Bucket=bucket, Prefix=prefix, | |
Delimiter=delimiter, MaxKeys=page_size, PaginationConfig={'PageSize': page_size}, | |
)) | |
def delete_page(page): | |
s3.delete_objects(Bucket=bucket, Delete={'Objects': page}) | |
on_delete(len(page)) | |
def process_page(page_iter, priority): | |
try: | |
page = next(page_iter) | |
except StopIteration: | |
return | |
# Deleting a page is done at the same priority as this function. It will often be done | |
# straight after this call because this call must have been the highest priority for it to | |
# have run, but there could be unfinished nodes earlier in the depth-first search that have | |
# since submitted work, and so would be prioritised over the deletion | |
if contents := page.get('Contents'): | |
delete_priority = priority | |
queue.put(Work( | |
priority=delete_priority, | |
func=partial(delete_page, [{'Key': obj['Key']} for obj in contents]), | |
)) | |
# Processing child prefixes are done after deletion and in order. Importantly anything each | |
# child prefix itself enqueues should be done before the work of any later child prefixes | |
# to make it a depth-first search. Appending the index of the child prefix to the priority | |
# tuple of this function does this, because the work inside each child prefix will only | |
# ever enqueue work at its priority or greater, but always less than the priority of | |
# subsequent child prefixes or the next page. | |
for prefix_index, obj in enumerate(page.get('CommonPrefixes', [])): | |
prefix_priority = priority + ((prefix_index,0),) | |
queue.put(Work( | |
priority=prefix_priority, | |
func=partial(process_page, | |
page_iter=list_iter(obj['Prefix']), priority=prefix_priority) | |
)) | |
# The next page in pagination for this prefix is processed after delete for this page, | |
# after all the child prefixes are processed, and after anything the child prefixes | |
# themselves enqueue. | |
next_page_priority = priority[:-1] + ((priority[-1][0], priority[-1][1] + 1),) | |
queue.put(Work( | |
priority=next_page_priority, | |
func=partial(process_page, page_iter=page_iter, priority=next_page_priority), | |
)) | |
with ThreadPoolExecutor(max_workers=workers) as worker_pool: | |
# Bootstrap with the first page | |
priority = ((0,0),) | |
queue.put(Work( | |
priority=priority, | |
func=partial(process_page, page_iter=list_iter(prefix), priority=priority), | |
)) | |
# Run workers, waiting for the first exception, if any, raised by them | |
done, _ = wait( | |
tuple(worker_pool.submit(do_work) for _ in range(0, workers)), | |
return_when=FIRST_EXCEPTION, | |
) | |
# If an exception raised, stop all the other workers because otherwise exiting the | |
# ThreadPoolExecutor context will block, and re-raise the exception | |
if e := next(iter(done)).exception(): | |
for _ in range(0, workers - 1): | |
queue.put(stop) | |
raise e from None |
@michalc will this delete all the objects from versioned bucket ?
@tanayagrawa1 If you mean all the versions of objects in a versioned bucket, then no.
Suspect it shouldn't be too much work to tweak it to use list_object_versions
rather than list_objects_v2
though to do this, although shouldn't forget about the delete markers.
A rough, untested, initial guess as to the changes needed:
- Change code here from
list_objects_v2
tolist_object_versions
- Change code here to
if contents := page.get('Versions', []) + page.get('DeleteMarkers', []): delete_priority = priority queue.put(Work( priority=delete_priority, func=partial(delete_page, [{'Key': obj['Key'], 'VersionId': obj['VersionId']} for obj in contents]), ))
This works as expected, thanks a lot!
One thing I noticed is that with workers=8
, the S3 API rate limit is reached (tested on dirs with ~300K objects which sum up to ~1 TB per dir). Reducing the worker count to 4
resolved the rate limit issue. Also, the script deletes each of the specified dirs within only 2 minutes, which is very impressive.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage
To delete everything under the
my-prefix/
prefix inmy-bucket
you can use thebucket
andprefix
parameters:Or to do the same thing, but integrated with tqdm to give feedback when run from the command line you can use the
on_delete
parameter:Or to customise the boto3 S3 client that's used, you can use the
get_s3_client
parameter: