Created
October 25, 2022 21:52
-
-
Save nousr/5a9dc1ad09a4be280247994eb462c0e3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import fsspec | |
import click | |
import huggingface_hub as hf_hub | |
from time import perf_counter | |
from braceexpand import braceexpand | |
import multiprocessing as mp | |
from tqdm import tqdm | |
def make_commit(file_url: str, folder_in_repo: str, queue: mp.Queue): | |
""" | |
Create a single commit operation to upload a file to a repo. | |
""" | |
filesystem, path = fsspec.core.url_to_fs(file_url) | |
filename = os.path.basename(path) | |
path_in_repo = os.path.join(folder_in_repo, filename) | |
try: | |
with filesystem.open(path, "rb") as f: | |
operation = hf_hub.CommitOperationAdd( | |
path_in_repo=path_in_repo, | |
path_or_fileobj=f.read(), | |
) | |
operation.validate() | |
queue.put(operation) | |
except ValueError: | |
raise ValueError(f"Invalid file {file_url}") | |
except Exception as e: | |
print(f"Error: {e}") | |
def upload_step( | |
urls: str, | |
repo_id: str, | |
batch_id: int, | |
folder_in_repo: str, | |
upload_chunk_size: int, | |
num_threads: int, | |
commit_description: str, | |
commit_message: str, | |
): | |
""" | |
Upload a batch of files to a repo. | |
""" | |
start = perf_counter() | |
# Create a queue to store the commit operations | |
manager = mp.Manager() | |
operations = manager.Queue() | |
# Create a commit operation for each file using a Pool | |
with mp.get_context(method="spawn").Pool(num_threads) as pool: | |
pool.starmap( | |
make_commit, | |
tqdm([(file_url, folder_in_repo, operations) for file_url in urls]), | |
) | |
print( | |
f"Created {operations.qsize()} commit operations in {perf_counter()-start} seconds, uploading to repo...", | |
flush=True, | |
) | |
# convert the queue to a list | |
operations = [operations.get() for _ in range(operations.qsize())] | |
# upload the operations in batches | |
for chunk_id in tqdm(range(0, len(operations), upload_chunk_size)): | |
hf_hub.create_commit( | |
repo_id=repo_id, | |
repo_type="dataset", | |
operations=operations[chunk_id : chunk_id + upload_chunk_size], | |
commit_description=commit_description, | |
commit_message=f"{commit_message} batch_id: {batch_id} - chunk_id: {chunk_id}", | |
create_pr=False, | |
num_threads=num_threads, | |
) | |
print( | |
f"All files in this batch were completed after {perf_counter()-start} seconds.", | |
flush=True, | |
) | |
@click.command() | |
@click.option("--s3-path", required=True, type=str) | |
@click.option("--repo-id", required=True, type=str) | |
@click.option("--folder-in-repo", required=True, type=str) | |
@click.option("--num-threads", required=True, type=int, default=os.cpu_count()) | |
@click.option("--commit-message", required=True, type=str) | |
@click.option("--commit-description", required=True, type=str) | |
@click.option("--files-in-ram", required=True, type=int) | |
@click.option("--upload-chunk-size", required=True, type=int) | |
def main( | |
s3_path, | |
repo_id, | |
folder_in_repo, | |
num_threads, | |
commit_message, | |
commit_description, | |
files_in_ram, | |
upload_chunk_size, | |
): | |
""" | |
Upload files to a HuggingFace Hub repo. | |
s3_path: path to the files to upload in s3 | |
repo_id: the repo to upload to in huggingface "username/repo_name" | |
folder_in_repo: the folder in the repo to upload to | |
num_threads: number of threads to use for uploading & generating commit operations | |
commit_message: the commit message to use for each commit | |
commit_description: the commit description to use for each commit | |
files_in_ram: will essentially determine how many files to store in RAM at a time. | |
upload_chunk_size: how many files to upload in a single commit. | |
""" | |
file_urls = list(braceexpand(s3_path)) | |
for batch_id in tqdm(range(0, len(file_urls), files_in_ram)): | |
print( | |
f"Uploading files {batch_id} to {batch_id+files_in_ram}...", flush=True | |
) | |
upload_step( | |
urls=file_urls[batch_id : batch_id + files_in_ram], | |
repo_id=repo_id, | |
batch_id=batch_id, | |
folder_in_repo=folder_in_repo, | |
upload_chunk_size=upload_chunk_size, | |
num_threads=num_threads, | |
commit_description=commit_description, | |
commit_message=commit_message, | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment