Last active
December 12, 2023 18:48
-
-
Save brews/0a42fc5d0c276930c7e4db581cebe99e to your computer and use it in GitHub Desktop.
Fills in implied directory blobs for GCS bucket mounted with GCSfuse without using the --implied-dir option.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Fills in implied directory blobs for GCS bucket mounted with GCSfuse without using the --implied-dir option. | |
If you tell it to look in `mygcsbucket` for prefix `path/to/files/to/read/in/gcsfuse/`. It will | |
ensure implied dirs nested in gs://mygcsbucket/path/to/files/to/read/in/gcsfuse/ get covered. | |
It can handle 10 - 100k directories in under 30 minutes if you run it from Cloud Shell. | |
""" | |
import logging | |
from pathlib import Path | |
from collections.abc import Iterator | |
from google.cloud import storage | |
BUCKET_NAME = "mygcsbucket" | |
PREFIX = "path/to/files/to/read/in/gcsfuse/" | |
_log = logging.getLogger(__name__) | |
# Caching could make this better, reduce API calls, etc. | |
# Async with thread pool? Prob should do this in golang. | |
def find_implied_dirs(blob_name: str) -> Iterator[str]: | |
""" | |
Get all implied dir names in bucket blob name | |
""" | |
p = Path(blob_name).parent | |
# Skip if root dir. | |
if p == Path("."): | |
yield | |
# Parse implied parent dirs in blob name. | |
for i in range(1, len(p.parts) + 1): | |
# Add the "/" to end for bucket path compat. | |
parentdir = str(Path(*p.parts[:i])) + "/" | |
yield parentdir | |
def main(bucket_name: str, prefix: str, dryrun: bool=False): | |
_log.info(f"Begining to process gs://{bucket_name}/{prefix}") | |
client = storage.Client() | |
bucket = client.get_bucket(bucket_name) | |
# Get a sorted, unique list of all the implied directories we need to consider. | |
candidates = sorted( | |
set( | |
(c for b in bucket.list_blobs(prefix=prefix) for c in find_implied_dirs(b.name)) | |
) | |
) | |
_log.info(f"Found n={len(candidates)}") | |
for candidate in candidates: | |
_log.debug(f"Processing {candidate}") | |
# Create empty blob for any non-exist directories | |
dir_blob = bucket.blob(candidate) | |
if dir_blob.exists(): | |
_log.debug(f"Placeholder blob already exists for {dir_blob.name}, skipping") | |
continue | |
if not dryrun: | |
# Create empty blob as dir placeholder. | |
dir_blob.upload_from_string("", content_type="application/x-www-form-urlencoded;charset=UTF-8") | |
_log.info(f"Created blob for {dir_blob.name}") | |
_log.info(f"Done processing gs://{bucket_name}/{prefix}") | |
if __name__ == "__main__": | |
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s:%(message)s', level="INFO") | |
main( | |
bucket_name=BUCKET_NAME, | |
prefix=PREFIX, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment