Created
February 24, 2025 21:49
-
-
Save jsbueno/515d79336437ad09dac2ee52f05a2e50 to your computer and use it in GitHub Desktop.
ssincronous compatible "shutils.copytree" wrapper, allowing for progress indication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This asynchronous version is really naive: | |
it will block the asyncio loop while walkign the file tree - TWICE - | |
and then, the defautl call will print the progress to the terminal | |
pass a callback as "progress" parameter which takes a dictionary | |
describing the current progress as its sole parameter: | |
the callback will be called for each file copied during execution. | |
""" | |
import os, shutil, asyncio | |
async def acopytree(src, dst, *, totalize_first=True, progress=None, executor=None, **kwargs): | |
files = {"count": 0, "size": 0, "copied": 0, "copied_bytes": 0} | |
tasks = set() | |
loop = asyncio.get_running_loop() | |
copy_function = kwargs.pop("copy_function", shutil.copy2) | |
def count_only(src, dst, *, follow_symlinks=True): | |
files["count"] += 1 | |
files["size"] += os.stat(src, follow_symlinks=follow_symlinks).st_size | |
if totalize_first: | |
# FIXME: blocking call | |
shutil.copytree(src, dst, copy_function=count_only, dirs_exist_ok=kwargs.get("dirs_exist_ok", False)) | |
def default_progress(files): | |
copied = files["copied"]; total_files=files["count"] | |
byte_size = files["size"]; copied_bytes = files["copied_bytes"] | |
byte_size += 1 if not byte_size else 0 | |
print(f"\r Files Copied: {copied} / {total_files}, total: {copied_bytes / byte_size * 100:3.01f}% ", flush=True, end="") | |
if not progress: | |
progress = default_progress | |
def copy_bridge(src, dst, *, follow_symlinks=True): | |
task = loop.run_in_executor(executor, lambda: copy_function(src, dst, follow_symlinks=follow_symlinks)) | |
tasks.add(task) | |
def post_copy(task): | |
# this is executed in the asyncio loop thread - otherwise we'd need locks! | |
ammount_copied = os.stat(dst).st_size | |
files["copied_bytes"] += ammount_copied | |
files["copied"] += 1 | |
progress(files) | |
task.add_done_callback(post_copy) | |
# Will synchronously walk the tree, and schedule all file copies as tasks:. (walking the file tree will block) | |
kwargs["dirs_exist_ok"] = True # counting the files above will have created directories already | |
shutil.copytree(src, dst, copy_function=copy_bridge, **kwargs) | |
return asyncio.gather(*tasks) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment