Last active
February 20, 2024 07:53
-
-
Save RhetTbull/4510d5f9912b9c703d34dac4c1afc16a to your computer and use it in GitHub Desktop.
Python method to copy a file with a callback (e.g. to show a progress bar). Includes example of copying with both click.progressbar and tqdm.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Copy a file with callback (E.g. update a progress bar) """ | |
# based on flutefreak7's answer at StackOverflow | |
# https://stackoverflow.com/questions/29967487/get-progress-back-from-shutil-file-copy-thread/48450305#48450305 | |
# License: MIT License | |
import os | |
import pathlib | |
import shutil | |
# how many bytes to read at once? | |
# shutil.copy uses 1024 * 1024 if _WINDOWS else 64 * 1024 | |
# however, in my testing on MacOS with SSD, I've found a much larger buffer is faster | |
BUFFER_SIZE = 4096 * 1024 | |
class SameFileError(OSError): | |
"""Raised when source and destination are the same file.""" | |
class SpecialFileError(OSError): | |
"""Raised when trying to do a kind of operation (e.g. copying) which is | |
not supported on a special file (e.g. a named pipe)""" | |
def copy_with_callback( | |
src, dest, callback=None, follow_symlinks=True, buffer_size=BUFFER_SIZE | |
): | |
""" Copy file with a callback. | |
callback, if provided, must be a callable and will be | |
called after ever buffer_size bytes are copied. | |
Args: | |
src: source file, must exist | |
dest: destination path; if an existing directory, | |
file will be copied to the directory; | |
if it is not a directory, assumed to be destination filename | |
callback: callable to call after every buffer_size bytes are copied | |
callback will called as callback(bytes_copied since last callback, total bytes copied, total bytes in source file) | |
follow_symlinks: bool; if True, follows symlinks | |
buffer_size: how many bytes to copy before each call to the callback, default = 4Mb | |
Returns: | |
Full path to destination file | |
Raises: | |
FileNotFoundError if src doesn't exist | |
SameFileError if src and dest are the same file | |
SpecialFileError if src or dest are special files (e.g. named pipe) | |
Note: Does not copy extended attributes, resource forks or other metadata. | |
""" | |
srcfile = pathlib.Path(src) | |
destpath = pathlib.Path(dest) | |
if not srcfile.is_file(): | |
raise FileNotFoundError(f"src file `{src}` doesn't exist") | |
destfile = destpath / srcfile.name if destpath.is_dir() else destpath | |
if destfile.exists() and srcfile.samefile(destfile): | |
raise SameFileError( | |
f"source file `{src}` and destinaton file `{dest}` are the same file." | |
) | |
# check for special files, lifted from shutil.copy source | |
for fname in [srcfile, destfile]: | |
try: | |
st = os.stat(str(fname)) | |
except OSError: | |
# File most likely does not exist | |
pass | |
else: | |
if shutil.stat.S_ISFIFO(st.st_mode): | |
raise SpecialFileError(f"`{fname}` is a named pipe") | |
if callback is not None and not callable(callback): | |
raise ValueError("callback is not callable") | |
if not follow_symlinks and srcfile.is_symlink(): | |
if destfile.exists(): | |
os.unlink(destfile) | |
os.symlink(os.readlink(str(srcfile)), str(destfile)) | |
else: | |
size = os.stat(src).st_size | |
with open(srcfile, "rb") as fsrc: | |
with open(destfile, "wb") as fdest: | |
_copyfileobj( | |
fsrc, fdest, callback=callback, total=size, length=buffer_size | |
) | |
shutil.copymode(str(srcfile), str(destfile)) | |
return str(destfile) | |
def _copyfileobj(fsrc, fdest, callback, total, length): | |
""" copy from fsrc to fdest | |
Args: | |
fsrc: filehandle to source file | |
fdest: filehandle to destination file | |
callback: callable callback that will be called after every length bytes copied | |
total: total bytes in source file (will be passed to callback) | |
length: how many bytes to copy at once (between calls to callback) | |
""" | |
copied = 0 | |
while True: | |
buf = fsrc.read(length) | |
if not buf: | |
break | |
fdest.write(buf) | |
copied += len(buf) | |
if callback is not None: | |
callback(len(buf), copied, total) | |
# -------------------- example usage with different progress bars -------------------- | |
import time | |
import click | |
from tqdm import tqdm | |
@click.command(help="Copy a file from SRCFILE to DESTFILE with click.progressbar.") | |
@click.argument("srcfile", type=click.Path(exists=True, file_okay=True, dir_okay=False)) | |
@click.argument("destfile", type=click.Path()) | |
@click.option("--nofollow", help="Do not follow symlinks.", is_flag=True, default=False) | |
@click.option("--bufsize", help=f"Buffer size; default is {BUFFER_SIZE}.", type=int) | |
@click.option( | |
"--tqdm", | |
"tqdm_", | |
help="Use tqdm progress bar instead of click.", | |
is_flag=True, | |
default=False, | |
) | |
@click.option( | |
"--noprogress", | |
"noprogress", | |
help="Don't use a progress bar.", | |
is_flag=True, | |
default=False, | |
) | |
def main(srcfile, destfile, nofollow, tqdm_, noprogress, bufsize): | |
""" demonstrate use of copy_with_callback and click.progressbar or tqdm progress bar""" | |
bufsize = bufsize or BUFFER_SIZE | |
size = os.stat(srcfile).st_size | |
follow = not nofollow | |
start_t = time.time() | |
if noprogress: | |
dest = copy_with_callback( | |
srcfile, | |
destfile, | |
follow_symlinks=follow, | |
callback=None, | |
buffer_size=bufsize, | |
) | |
elif tqdm_: | |
with tqdm(total=size) as bar: | |
dest = copy_with_callback( | |
srcfile, | |
destfile, | |
follow_symlinks=follow, | |
callback=lambda copied, total_copied, total: bar.update(copied), | |
buffer_size=bufsize, | |
) | |
else: | |
with click.progressbar(length=size) as bar: | |
dest = copy_with_callback( | |
srcfile, | |
destfile, | |
follow_symlinks=follow, | |
callback=lambda copied, total_copied, total: bar.update(copied), | |
buffer_size=bufsize, | |
) | |
stop_t = time.time() | |
delta_t = stop_t - start_t | |
click.echo(f"Done: copied {size} bytes to {dest} in {delta_t:.3f} seconds.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment