Skip to content

Instantly share code, notes, and snippets.

@egberts
Created July 4, 2024 18:29
Show Gist options
  • Save egberts/f9f9f1f3156bc17a1c8718b3dfd068d3 to your computer and use it in GitHub Desktop.
Save egberts/f9f9f1f3156bc17a1c8718b3dfd068d3 to your computer and use it in GitHub Desktop.
clean_output_dir()/utils.py performance test for getpelican/pelican repo
from __future__ import annotations
import logging
import os
import pathlib
import shutil
from datetime import datetime as dt
from collections.abc import Iterable
LOG_FORMAT = "%(name)s: %(message)s"
level = 20
logging.basicConfig(
level=level,
format=LOG_FORMAT,
datefmt="[%H:%M:%S]",
#handlers=[handler] if handler else [], # for our UT benchmark, this is a pesky IT effort
)
logger = logging.getLogger('os-listdir')
logger.setLevel(level)
def clean_output_dir_original(path: str, retention: Iterable[str]) -> None:
"""Remove all files from output directory except those in retention list"""
if not os.path.exists(path):
logger.info("Directory already removed: %s", path)
return
if not os.path.isdir(path):
try:
os.remove(path)
except Exception as e:
logger.error("Unable to delete file %s; %s", path, e)
return
# remove existing content from output folder unless in retention list
for filename in os.listdir(path):
file = os.path.join(path, filename)
if any(filename == retain for retain in retention):
logger.info(
"Skipping deletion; %s is on retention list: %s", filename, file
)
elif os.path.isdir(file):
try:
shutil.rmtree(file)
logger.info("Deleted directory %s", file)
except Exception as e:
logger.error("Unable to delete directory %s; %s", file, e)
elif os.path.isfile(file) or os.path.islink(file):
try:
os.remove(file)
logger.debug("Deleted file/link %s", file)
except Exception as e:
logger.error("Unable to delete file %s; %s", file, e)
else:
logger.error("Unable to delete %s, file type unknown", file)
def clean_output_dir_new(path: str, retention: Iterable[str]) -> None:
"""Remove all files from output directory except those in retention list"""
if not os.path.exists(path):
logger.debug("Directory already removed: %s", path)
return
if not os.path.isdir(path):
try:
os.remove(path)
except Exception as e:
logger.error("Unable to delete file %s; %s", path, e)
return
# remove existing content from output folder unless in retention list
for dir_entry in os.scandir(path):
file = dir_entry.path
if any(dir_entry.name == retain for retain in retention):
logger.debug(
"Skipping deletion; %s is on retention list: %s", dir_entry.name, file
)
elif dir_entry.is_dir():
try:
shutil.rmtree(file)
logger.debug("Deleted directory %s", file)
except Exception as e:
logger.error("Unable to delete directory %s; %s", file, e)
elif dir_entry.is_file(follow_symlinks=False) or dir_entry.is_symlink():
try:
os.remove(file)
logger.debug("Deleted file/link %s", file)
except Exception as e:
logger.error("Unable to delete file %s; %s", file, e)
else:
logger.error("Unable to delete %s, file type unknown", file)
# Performance test
alphanum = 'abcdefghijklmnopqrstuvwxyz0123456789'
DIR_SEPARATOR = '/'
TEST_DIR = '/tmp/output'
shutil.rmtree(path=TEST_DIR, ignore_errors=False)
total_files = 0
total_dirs = 0
total_inodes = 0
def populate_files(test_path):
global total_dirs
global total_files
global total_inodes
def add_char_then_touch(test_dir, suffix_filename):
global total_files
for first_chr in alphanum:
this_name = test_dir + DIR_SEPARATOR + suffix_filename + first_chr
pathlib.Path(this_name).touch()
total_files += 1
for second_chr in alphanum:
that_name = this_name + second_chr
pathlib.Path(that_name).touch()
total_files += 1
# establish Test directory
current_template = test_path
try:
os.mkdir(current_template)
except:
pass
total_dirs += 1
# populate test directory
for first_char in alphanum:
this_file = current_template + DIR_SEPARATOR + first_char
pathlib.Path(this_file).touch()
total_files += 1
add_char_then_touch(current_template, first_char)
# prepare test area
populate_files(TEST_DIR)
# start performance test
old_start_dt = dt.now()
# Exercise target routine
clean_output_dir_original(TEST_DIR, [])
old_end_dt = dt.now()
# prepare test area
populate_files(TEST_DIR)
new_start_dt = dt.now()
clean_output_dir_new(TEST_DIR, [])
# end performance test
new_end_dt = dt.now()
# test results
old_delta_utc = old_end_dt - old_start_dt
new_delta_utc = new_end_dt - new_start_dt
print(f'Original Delta time {old_delta_utc}')
print(f'Latest Delta time {new_delta_utc}')
print(f'Total inodes: {total_inodes}')
print(f'Total Files {total_files}')
print(f'Total Dirs {total_dirs}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment