Created
December 8, 2022 00:08
-
-
Save rabernat/c7ca31ffac475ca84e470ddaa3014f59 to your computer and use it in GitHub Desktop.
S3 Listing Benchmarks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from time import perf_counter | |
import functools | |
from rich import print | |
from math import log | |
import json | |
import uuid | |
import modal | |
import modal.aio | |
s3_image = modal.Image.debian_slim().pip_install(["aiobotocore", "numpy"]) | |
stub = modal.aio.AioStub("s3-test", image=s3_image) | |
def execution_timer(func): | |
@functools.wraps(func) | |
async def wrapper(*args, **kwargs): | |
tic = perf_counter() | |
result = await func(*args, **kwargs) | |
toc = perf_counter() | |
return {"function": func.__name__, "time": (toc - tic), **kwargs} | |
return wrapper | |
def generate_tree(depth=1, leaves=2, root: str = "root"): | |
if depth == 0: | |
return | |
if depth == 1: | |
for n in range(leaves): | |
yield "/".join((root, f"{n}.leaf")) | |
else: | |
for n in range(leaves): | |
new_root = "/".join((root, str(n))) | |
yield from generate_tree(depth=depth - 1, leaves=leaves, root=new_root) | |
def tree_size(depth, leaves): | |
return leaves**depth | |
# size = leaves**depth | |
# depth = log(size, leaves) | |
@stub.function(secret=modal.Secret.from_name("ryan-earthmover-aws-keys")) | |
@execution_timer | |
async def create_tree(*, depth: int, leaves: int, root: str): | |
from aiobotocore.session import get_session | |
bucket = "arraylake-test" | |
session = get_session() | |
async with session.create_client("s3", region_name="us-east-1") as client: | |
await asyncio.gather( | |
*( | |
client.put_object(Bucket=bucket, Key=key, Body=b"x01") | |
for key in generate_tree(depth, leaves, root) | |
), | |
return_exceptions=True, | |
) | |
async def list_objects_and_directories(client, bucket, root, include_subdirs=True): | |
if not root.endswith("/"): | |
root += "/" | |
is_finished = False | |
continuation_token = None | |
objects = [] | |
directories = [] | |
while not is_finished: | |
kwargs = {} if include_subdirs else {"Delimiter": "/"} | |
if continuation_token: | |
kwargs["ContinuationToken"] = continuation_token | |
result = await client.list_objects_v2(Bucket=bucket, Prefix=root, **kwargs) | |
objects += result.get("Contents", []) | |
directories += result.get("CommonPrefixes", []) | |
is_finished = not result["IsTruncated"] | |
continuation_token = result.get("NextContinuationToken") | |
return objects, [item["Prefix"] for item in directories] | |
async def count_objects_recursive(client, bucket, root) -> int: | |
objects, directories = await list_objects_and_directories( | |
client, bucket, root, include_subdirs=False | |
) | |
count = len(objects) | |
subdir_counts = await asyncio.gather( | |
*(count_objects_recursive(client, bucket, dir) for dir in directories) | |
) | |
return count + sum(subdir_counts) | |
@stub.function(secret=modal.Secret.from_name("ryan-earthmover-aws-keys")) | |
@execution_timer | |
async def list_flat(*, depth: int, leaves: int, root: str): | |
from aiobotocore.session import get_session | |
bucket = "arraylake-test" | |
session = get_session() | |
async with session.create_client("s3", region_name="us-east-1") as client: | |
objects, directories = await list_objects_and_directories( | |
client, bucket, root, include_subdirs=True | |
) | |
count = len(objects) | |
expected = tree_size(depth, leaves) | |
if count / expected < 0.9: | |
raise ValueError(f"Expected {expected} objects, got {count} objects") | |
@stub.function(secret=modal.Secret.from_name("ryan-earthmover-aws-keys")) | |
@execution_timer | |
async def list_recursive(*, depth: int, leaves: int, root: str): | |
from aiobotocore.session import get_session | |
bucket = "arraylake-test" | |
session = get_session() | |
async with session.create_client("s3", region_name="us-east-1") as client: | |
count = await count_objects_recursive(client, bucket, root) | |
expected = tree_size(depth, leaves) | |
if count / expected < 0.9: | |
raise ValueError(f"Expected {expected} objects, got {count} objects") | |
@stub.function(secret=modal.Secret.from_name("ryan-earthmover-aws-keys")) | |
@execution_timer | |
async def delete_tree(*, depth: int, leaves: int, root: str): | |
from aiobotocore.session import get_session | |
bucket = "arraylake-test" | |
session = get_session() | |
async with session.create_client("s3", region_name="us-east-1") as client: | |
await asyncio.gather( | |
*( | |
client.delete_object(Bucket=bucket, Key=key) | |
for key in generate_tree(depth, leaves, root) | |
), | |
return_exceptions=True, | |
) | |
@stub.function(secret=modal.Secret.from_name("ryan-earthmover-aws-keys")) | |
async def clear_all(*, root: str): | |
from aiobotocore.session import get_session | |
bucket = "arraylake-test" | |
session = get_session() | |
async with session.create_client("s3", region_name="us-east-1") as client: | |
objects, _ = await list_objects_and_directories( | |
client, bucket, root, include_subdirs=True | |
) | |
print("Deleting", len(objects), "objects") | |
await asyncio.gather( | |
*( | |
client.delete_object(Bucket=bucket, Key=obj["Key"]) | |
for obj in objects | |
), | |
return_exceptions=True, | |
) | |
async def benchmark_run(*, depth: int, leaves: int, nruns=4): | |
root = "tree/" + uuid.uuid4().hex | |
print("Total Nodes:", tree_size(depth, leaves), "Depth:", depth, "Leaves:", leaves, "Root:", root) | |
create_time = await create_tree(depth=depth, leaves=leaves, root=root) | |
results = [create_time] | |
print(create_time) | |
await asyncio.sleep(1) # give s3 some time to catch up | |
for run in range(nruns): | |
list_time_flat = await list_flat(depth=depth, leaves=leaves, root=root) | |
print(list_time_flat) | |
results.append(list_time_flat) | |
list_time_recursive = await list_recursive(depth=depth, leaves=leaves, root=root) | |
print(list_time_recursive) | |
results.append(list_time_recursive) | |
delete_time = await delete_tree(depth=depth, leaves=leaves, root=root) | |
print(delete_time) | |
results.append(delete_time) | |
return results | |
async def main(): | |
async with stub.run(): | |
await clear_all(root="root") | |
coros =[] | |
for max_depth in range(14, 17, 2): | |
size = 2**max_depth | |
for depth in range(1, max_depth + 1): | |
if not max_depth % depth == 0: | |
continue | |
f = max_depth // depth | |
leaves = 2**f | |
assert leaves**depth == size | |
coros.append(benchmark_run(depth=depth, leaves=leaves)) | |
for task in asyncio.as_completed(coros): | |
result = await task | |
fname = f"s3_listing_results/depth-{result[0]['depth']}_leaves-{result[0]['leaves']}.json" | |
print("Writing", fname) | |
with open(fname, mode="w") as fp: | |
json.dump(result, fp) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment