Created
July 4, 2024 18:29
-
-
Save egberts/f9f9f1f3156bc17a1c8718b3dfd068d3 to your computer and use it in GitHub Desktop.
clean_output_dir()/utils.py performance test for getpelican/pelican repo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import logging | |
import os | |
import pathlib | |
import shutil | |
from datetime import datetime as dt | |
from collections.abc import Iterable | |
LOG_FORMAT = "%(name)s: %(message)s" | |
level = 20 | |
logging.basicConfig( | |
level=level, | |
format=LOG_FORMAT, | |
datefmt="[%H:%M:%S]", | |
#handlers=[handler] if handler else [], # for our UT benchmark, this is a pesky IT effort | |
) | |
logger = logging.getLogger('os-listdir') | |
logger.setLevel(level) | |
def clean_output_dir_original(path: str, retention: Iterable[str]) -> None: | |
"""Remove all files from output directory except those in retention list""" | |
if not os.path.exists(path): | |
logger.info("Directory already removed: %s", path) | |
return | |
if not os.path.isdir(path): | |
try: | |
os.remove(path) | |
except Exception as e: | |
logger.error("Unable to delete file %s; %s", path, e) | |
return | |
# remove existing content from output folder unless in retention list | |
for filename in os.listdir(path): | |
file = os.path.join(path, filename) | |
if any(filename == retain for retain in retention): | |
logger.info( | |
"Skipping deletion; %s is on retention list: %s", filename, file | |
) | |
elif os.path.isdir(file): | |
try: | |
shutil.rmtree(file) | |
logger.info("Deleted directory %s", file) | |
except Exception as e: | |
logger.error("Unable to delete directory %s; %s", file, e) | |
elif os.path.isfile(file) or os.path.islink(file): | |
try: | |
os.remove(file) | |
logger.debug("Deleted file/link %s", file) | |
except Exception as e: | |
logger.error("Unable to delete file %s; %s", file, e) | |
else: | |
logger.error("Unable to delete %s, file type unknown", file) | |
def clean_output_dir_new(path: str, retention: Iterable[str]) -> None: | |
"""Remove all files from output directory except those in retention list""" | |
if not os.path.exists(path): | |
logger.debug("Directory already removed: %s", path) | |
return | |
if not os.path.isdir(path): | |
try: | |
os.remove(path) | |
except Exception as e: | |
logger.error("Unable to delete file %s; %s", path, e) | |
return | |
# remove existing content from output folder unless in retention list | |
for dir_entry in os.scandir(path): | |
file = dir_entry.path | |
if any(dir_entry.name == retain for retain in retention): | |
logger.debug( | |
"Skipping deletion; %s is on retention list: %s", dir_entry.name, file | |
) | |
elif dir_entry.is_dir(): | |
try: | |
shutil.rmtree(file) | |
logger.debug("Deleted directory %s", file) | |
except Exception as e: | |
logger.error("Unable to delete directory %s; %s", file, e) | |
elif dir_entry.is_file(follow_symlinks=False) or dir_entry.is_symlink(): | |
try: | |
os.remove(file) | |
logger.debug("Deleted file/link %s", file) | |
except Exception as e: | |
logger.error("Unable to delete file %s; %s", file, e) | |
else: | |
logger.error("Unable to delete %s, file type unknown", file) | |
# Performance test | |
alphanum = 'abcdefghijklmnopqrstuvwxyz0123456789' | |
DIR_SEPARATOR = '/' | |
TEST_DIR = '/tmp/output' | |
shutil.rmtree(path=TEST_DIR, ignore_errors=False) | |
total_files = 0 | |
total_dirs = 0 | |
total_inodes = 0 | |
def populate_files(test_path): | |
global total_dirs | |
global total_files | |
global total_inodes | |
def add_char_then_touch(test_dir, suffix_filename): | |
global total_files | |
for first_chr in alphanum: | |
this_name = test_dir + DIR_SEPARATOR + suffix_filename + first_chr | |
pathlib.Path(this_name).touch() | |
total_files += 1 | |
for second_chr in alphanum: | |
that_name = this_name + second_chr | |
pathlib.Path(that_name).touch() | |
total_files += 1 | |
# establish Test directory | |
current_template = test_path | |
try: | |
os.mkdir(current_template) | |
except: | |
pass | |
total_dirs += 1 | |
# populate test directory | |
for first_char in alphanum: | |
this_file = current_template + DIR_SEPARATOR + first_char | |
pathlib.Path(this_file).touch() | |
total_files += 1 | |
add_char_then_touch(current_template, first_char) | |
# prepare test area | |
populate_files(TEST_DIR) | |
# start performance test | |
old_start_dt = dt.now() | |
# Exercise target routine | |
clean_output_dir_original(TEST_DIR, []) | |
old_end_dt = dt.now() | |
# prepare test area | |
populate_files(TEST_DIR) | |
new_start_dt = dt.now() | |
clean_output_dir_new(TEST_DIR, []) | |
# end performance test | |
new_end_dt = dt.now() | |
# test results | |
old_delta_utc = old_end_dt - old_start_dt | |
new_delta_utc = new_end_dt - new_start_dt | |
print(f'Original Delta time {old_delta_utc}') | |
print(f'Latest Delta time {new_delta_utc}') | |
print(f'Total inodes: {total_inodes}') | |
print(f'Total Files {total_files}') | |
print(f'Total Dirs {total_dirs}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment