Last active
February 26, 2024 15:17
-
-
Save arabcoders/0422e94f818503130e21aa99b2f50d62 to your computer and use it in GitHub Desktop.
A Python script to rename files with history support.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- encoding: UTF8 -*- | |
# | |
# Copyright (c) 2024 ArabCoders | |
# Licensed under the MIT License | |
# | |
from argparse import RawDescriptionHelpFormatter | |
import logging | |
import sqlite3 | |
import os | |
import sys | |
import re | |
from datetime import datetime | |
import yaml | |
import json | |
from os.path import relpath as rp | |
class IndentDumper(yaml.Dumper): | |
def increase_indent(self, flow=False, indentless=False): | |
return super(IndentDumper, self).increase_indent(flow, False) | |
class Renamer: | |
""" | |
A filename renamer using regular expressions. With the ability to | |
reverse the changes and save the history. | |
""" | |
version: str = '0.1.0' | |
"Version number." | |
rgRx: re.Pattern = re.compile(r'(?P<mode>\w{1,2}?)/(?P<rg>(?P<search>.*?)/(?P<replace>.*?)/(?P<flags>\w{0,2}))') | |
"Regex pattern for the search/replace format." | |
windowsRX: re.Pattern = re.compile(r"[/\\?%*:|\"<>\x7F\x00-\x1F]") | |
"Windows invalid characters regex pattern." | |
db: sqlite3.Connection = None | |
"History database connection." | |
logger: logging.Logger = None | |
"Logger instance." | |
def __init__(self, logger: logging.Logger, db_file: str) -> None: | |
""" | |
Initialize the Renamer. | |
:param logger: Logger instance. | |
:param db_file: Database file path. | |
""" | |
self.logger = logger | |
self.initialize_db(db_file=db_file) | |
def initialize_db(self, db_file: str) -> None: | |
""" | |
Initialize the history database. | |
:param db_file: Database file path. | |
""" | |
initialize: bool = False | |
if not os.path.exists(db_file): | |
os.makedirs(os.path.dirname(db_file), exist_ok=True) | |
initialize = True | |
self.db = sqlite3.connect(db_file) | |
self.db.row_factory = sqlite3.Row | |
if not initialize: | |
return | |
self.logger.debug(f"Initializing history db '{db_file}'.") | |
db_commands: list = [ | |
'CREATE TABLE "history" ("id" integer PRIMARY KEY AUTOINCREMENT, "rx" text NOT NULL, "actions" JSON NOT NULL, "total" numeric NOT NULL, "ts" timestamp NOT NULL)', | |
] | |
for command in db_commands: | |
self.logger.debug(f"Executing [{command}].") | |
self.db.execute(command) | |
def get_history(self, id: int) -> dict: | |
""" | |
Get the history of a given operation. | |
:param id: History id. | |
:return: History item. | |
:throws ValueError: If no history found for the given id. | |
""" | |
cursor = self.db.cursor() | |
cursor.execute('SELECT * from "history" WHERE id = ?', (id,)) | |
row = cursor.fetchone() | |
if not row: | |
raise ValueError(f'No history found for id [#{id}].') | |
item = { | |
'id': row['id'], | |
'rx': row['rx'], | |
'total': int(row['total']), | |
'ts': datetime.fromtimestamp(row['ts']), | |
'actions': row['actions'], | |
} | |
item['actions'] = json.loads(item['actions']) | |
return item | |
def take_action(self, **kwargs) -> int: | |
""" | |
Take action based on the given arguments. | |
:param kwargs: Arguments. | |
:return: Exit code. | |
kwargs: | |
rx: str - Regex pattern. | |
inputs: list - Files or paths to rename. | |
force: bool - Allow overwriting existing files. | |
allow_hidden: bool - Allow processing hidden files. | |
safe_for_windows: bool - Make the final filename safe for Windows OS. | |
substitute: str - Substitute character for invalid characters. | |
no_action: bool - Only print the changes do not execute or save them. | |
assume_yes: bool - Assume yes for confirmation to proceed. | |
dont_save: bool - Do not save the operation in the history. | |
list_history: bool - List history of saved operations. | |
view_history: int - Display given operation number. | |
reverse: int - Reverse the given operation number. | |
""" | |
if kwargs.get('list_history', False): | |
return self.list_history(**kwargs) | |
if kwargs.get('view_history', False): | |
return self.view_history(id=kwargs.get('view_history', 0), **kwargs) | |
if kwargs.get('reverse', False): | |
return self.reverse_history(id=kwargs.get('reverse', 0), **kwargs) | |
return self.rename_files(**kwargs) | |
def list_history(self, **kwargs) -> int: | |
""" | |
List the history of saved operations. | |
:param kwargs: Arguments. | |
:return: Exit code. | |
""" | |
cursor = self.db.cursor() | |
cursor.execute('SELECT "id", "total", "ts" from "history" ORDER BY "id" DESC') | |
data = [] | |
for row in cursor: | |
data.append({'id': row['id'], 'total': row['total'], 'ts': row['ts']}) | |
if len(data) < 1: | |
self.logger.info('No records are found.') | |
return 0 | |
sep = '-' * 49 | |
print(sep) | |
print('|' + ' ' * 3 + 'ID' + ' ' * 3 + '| Actions | ' + ' ' * 11 + 'Date' + ' ' * 12 + '|') | |
print(sep) | |
for row in data: | |
id = row['id'] | |
total = row['total'] | |
makeSpace = 6 - int(len(str(id))) | |
makeSpace2 = 8 - int(len(str(total))) | |
print(f"| #{id}" + ' ' * makeSpace + f'| {total}' + ' ' * makeSpace2 + | |
f"| {datetime.fromtimestamp(row['ts'])} |") | |
if row != data[-1]: | |
print(sep) | |
print(sep) | |
return 0 | |
def view_history(self, id: int, **kwargs) -> int: | |
""" | |
View the history of a given operation. | |
:param id: History id. | |
:param kwargs: Arguments. | |
:return: Exit code. | |
""" | |
try: | |
item = self.get_history(id) | |
except ValueError as e: | |
self.logger.error(e) | |
return 1 | |
yaml.dump(data=item, stream=sys.stdout, default_flow_style=False, | |
Dumper=IndentDumper, indent=2, sort_keys=False) | |
def reverse_history(self, id: int, **kwargs) -> int: | |
""" | |
Reverse the given operation. | |
:param id: History id. | |
:param kwargs: Arguments. | |
:return: Exit code. | |
:kwargs: | |
force: bool - Allow overwriting existing files. | |
""" | |
try: | |
item = self.get_history(id) | |
except ValueError as e: | |
self.logger.error(e) | |
return 1 | |
force: bool = bool(kwargs.get('force', False)) | |
actions = [] | |
for action in item['actions']: | |
if 'success' not in action or action['success'] is False: | |
self.logger.warning(f"'{rp(action['old'])}' not reversing: previous operation failed.") | |
continue | |
if not os.path.exists(action['new']): | |
self.logger.warning(f"'{rp(action['new'])}' not reversing: does not exist.") | |
continue | |
if force is False and os.path.exists(action['old']): | |
self.logger.warning(f"'{rp(action['new'])}' not reversing: '{rp(action['old'])}' already exists.") | |
continue | |
if any(action['old'] == x['new'] for x in actions): | |
for x in actions: | |
if action['old'] != x['new']: | |
continue | |
self.logger.warning( | |
f"'{rp(action['new'])}' not reversing: '{rp(x['old'])}' shares the same new name '{rp(action['old'])}'.") | |
continue | |
actions.append({'old': action['new'], 'new': action['old']}) | |
if len(actions) < 1: | |
self.logger.warning("No possible candidates found for reversing.") | |
return 0 | |
kwargs['rx'] = item['rx'] | |
return self.run(actions=actions, **kwargs) | |
def rename_files(self, rx: str, inputs: list, **kwargs) -> int: | |
""" | |
Rename files based on the given regex. | |
:param rx: Regex pattern. | |
:param inputs: Files or paths to rename. | |
:param kwargs: Arguments. | |
:return: Exit code. | |
:kwargs: | |
force: bool - Allow overwriting existing files. | |
safe_for_windows: bool - Make the final filename safe for windows. | |
substitute: str - Substitute character for invalid characters. | |
allow_hidden: bool - Allow processing hidden files. | |
no_action: bool - Only print the changes do not execute or save them. | |
assume_yes: bool - Assume yes for confirmation to proceed. | |
dont_save: bool - Do not save the operation in the history. | |
""" | |
if not rx: | |
self.logger.error("Please provide a search/replace pattern.") | |
sys.exit(1) | |
if not inputs: | |
self.logger.error("Please provide file or path to take action on.") | |
sys.exit(1) | |
force: bool = bool(kwargs.get('force', False)) | |
safeWindows: bool = bool(kwargs.get('safe_for_windows', False)) | |
safeWindowsSubstitute: str = kwargs.get('substitute', '') | |
allowHidden: bool = bool(kwargs.get('allow_hidden', False)) | |
splitPat = self.rgRx.match(rx) | |
if not splitPat: | |
self.logger.error(f"Invalid search/replace pattern was given '{rx}'.") | |
return 1 | |
rx_d = splitPat.groupdict() | |
if not rx_d: | |
print(splitPat.groupdict()) | |
self.logger.error(f"Invalid search/replace pattern was given '{rx}'.") | |
return 1 | |
if not rx_d.get('search', False): | |
self.logger.error( | |
f"Invalid regex pattern was given '{rx}'. No search pattern was given.") | |
return 1 | |
mode = rx_d.get('mode', None) | |
if not mode or mode not in ['s', 'S']: | |
self.logger.error(f"Invalid mode was given '{mode}' only the search mode 's/' is supported.") | |
return 1 | |
rx_flags = 0 | |
flags = rx_d.get('flags', None) | |
globalMode = False | |
if flags: | |
# split the flags and apply them to the regex pattern. | |
for flag in flags: | |
if flag not in ['i', 'g', 'm', 's', 'x', 'A', 'U', 'L', 'M', 'S', 'X']: | |
self.logger.error(f"Invalid regular expression flag was given '{flag}'.") | |
return 1 | |
if flag == 'g': | |
globalMode = True | |
continue | |
rx_flags |= getattr(re, flag.upper()) | |
try: | |
rx_p = re.compile(rx_d.get('search'), flags=rx_flags) | |
except re.error as e: | |
self.logger.error(f"Invalid search pattern was given '{rx_d.get('search')}'. {e}") | |
return 1 | |
rx_r = rx_d.get('replace', '') | |
if rx_r: | |
# replace $N with \N | |
rx_r = re.sub(r'(?<!\\)\$(\d{1,2})', r'\\\1', rx_r) | |
files: list = [] | |
for path in inputs: | |
if not os.path.exists(path): | |
self.logger.error(f"'{path}' does not exist.") | |
sys.exit(1) | |
for f in self.get_files(path, allow_hidden=allowHidden): | |
files.append(f) | |
actions: list = [] | |
for file in files: | |
file_path = os.path.dirname(file) | |
file_name = os.path.basename(file) | |
if globalMode is False and not rx_p.search(file_name): | |
self.logger.debug(f"'{file_name}' not renamed: does not match the regex.") | |
continue | |
if globalMode is True and not rx_p.findall(file_name): | |
self.logger.debug(f"'{file_name}' not renamed: does not match the regex.") | |
continue | |
new_name = rx_p.sub(rx_r, file_name) | |
if safeWindows: | |
new_name = self.safe_windows_filename(filename=new_name, substitute=safeWindowsSubstitute) | |
new_name = os.path.join(file_path, new_name) | |
if force is False and os.path.exists(new_name): | |
self.logger.warning(f"'{rp(file_name)}' not renamed: '{rp(new_name)}' already exists.") | |
continue | |
if any(new_name == x['new'] for x in actions): | |
for x in actions: | |
if new_name == x['new']: | |
self.logger.warning( | |
f"'{rp(file_name)}' not renamed: '{rp(x['old'])}' shares the same new name '{rp(new_name)}'.") | |
continue | |
if new_name == file: | |
self.logger.debug(f"'{file_name}' not renamed: already matches.") | |
continue | |
actions.append({'old': file, 'new': new_name}) | |
if len(actions) < 1: | |
self.logger.warning(f"No possible candidates found for renaming that matches '{rx_d.get('search')}'.") | |
return 0 | |
return self.run(rx=rx, actions=actions, **kwargs) | |
def run(self, actions: list, **kwargs) -> int: | |
""" | |
Run the renaming and save the history. | |
:param actions: List of actions. | |
:param kwargs: Arguments. | |
:return: Exit code. | |
:kwargs: | |
rx: str - Regex pattern. | |
no_action: bool - Only print the changes do not execute or save them. | |
assume_yes: bool - Assume yes for confirmation to proceed. | |
dont_save: bool - Do not save the operation in the history. | |
""" | |
rx: str = kwargs.get('rx') | |
noAction: bool = bool(kwargs.get('no_action', False)) | |
assumeYes: bool = bool(kwargs.get('assume_yes', False)) | |
dontSave: bool = bool(kwargs.get('dont_save', False)) | |
if assumeYes is False or noAction is True: | |
for rec in actions: | |
print(f"Renaming '{rp(rec['old'])}' => '{rp(rec['new'])}'.") | |
if noAction: | |
return 0 | |
if assumeYes is False and not self.confirm(): | |
return 0 | |
for item in actions: | |
try: | |
os.rename(item['old'], item['new']) | |
self.logger.info(f"'{rp(item['old'])}' renamed: '{rp(item['new'])}'.") | |
item['success'] = True | |
except Exception as e: | |
item['error'] = str(e) | |
item['success'] = False | |
self.logger.error(f"'{rp(item['old'])}' not renamed: {e}") | |
if dontSave is False: | |
cursor = self.db.cursor() | |
cursor.execute( | |
'INSERT INTO "history" ("rx", "actions", "total", "ts") VALUES (?, ?, ?, ?)', | |
(rx, json.dumps(actions, ensure_ascii=False), len(actions), datetime.now().timestamp(),) | |
) | |
self.db.commit() | |
self.logger.info(f"Operation log saved as id [#{cursor.lastrowid}].") | |
return 0 | |
def get_files(self, path: str, allow_hidden: bool = False) -> list: | |
""" | |
Get the list of files in the given path. | |
:param path: Path to search for files. | |
:param allow_hidden: Allow processing hidden files. | |
:return: List of files. | |
""" | |
if os.path.isfile(path): | |
return [os.path.realpath(path)] | |
files = [] | |
for file in os.listdir(path): | |
FullName = os.path.realpath(os.path.join(path, file)) | |
if not os.path.isfile(FullName): | |
continue | |
if allow_hidden is False and file.startswith('.'): | |
continue | |
files.append(FullName) | |
def natural_sort(l): | |
def convert(text): return int(text) if text.isdigit() else text.lower() | |
def alphanum_key(key): return [convert(c) for c in re.split('([0-9]+)', key)] | |
return sorted(l, key=alphanum_key) | |
return natural_sort(files) | |
def confirm(self) -> bool: | |
""" | |
Confirm the action to proceed. | |
:return: True if confirmed, False otherwise. | |
""" | |
try: | |
confirm = input("Continue? [y/n]: ") | |
confirm = confirm.lower() | |
if 'y' != confirm and 'n' != confirm: | |
print("\n Invalid option was entered. Please enter 'y' or 'n' to proceed.") | |
return self.confirm() | |
return True if 'y' == confirm else False | |
except KeyboardInterrupt: | |
return False | |
def safe_windows_filename(self, filename: str, substitute: str = '') -> str: | |
""" | |
Make the filename safe for windows. | |
We may want to check for special filenames such as 'CON', 'PRN', 'AUX', 'NUL', 'COM1', 'COM2', 'COM3', 'COM4', | |
in the future. if we want to make it more strict. | |
:param filename: Filename to make safe. | |
:param substitute: Substitute character for invalid characters. | |
:return: Safe filename. | |
""" | |
return re.sub(r"[/\\?%*:|\"<>\x7F\x00-\x1F]", substitute, filename) | |
@staticmethod | |
def cli() -> int: | |
""" | |
CLI interface for the class. | |
:return: Exit code. | |
""" | |
import argparse | |
parser = argparse.ArgumentParser( | |
description='''Filenames Renamer using regular expressions. | |
The pattern is in the format of 'MODE/SEARCH/REPLACE/FLAGS'. | |
The supported modes are: | |
- s: Search and replace mode. | |
The flags are optional and can be any combination of the following: | |
- i: case-insensitive. | |
- g: global match. | |
- m: multi-line match. | |
- s: dot matches newline. | |
- x: free-spacing. | |
- A: ASCII-only matching. | |
- U: Unicode matching. | |
- L: Locale dependent matching. | |
- M: Multi-line mode. | |
- S: DOTALL mode. | |
- X: VERBOSE mode. | |
The search pattern is a regular expression pattern it uses the Python regular expression syntax. | |
For the replace pattern, you can use the following special sequences: | |
- \g<name>: The substring matched by the group named name. | |
- \g<number>: The substring matched by the group number. | |
- \g<N>: The substring matched by the N group. | |
- \\N: The substring matched by the N match. | |
- $N: The substring matched by the N match. < this one is special will be replaced with \\N. | |
** Note: the dollar sign $ is special case. and if you want to use it as part of the replace pattern you need to escape it with a backslash. | |
Examples: | |
- renamer 's/\.txt/\.md/' file1.txt file2.txt | |
- renamer -sfw 's/(\d+) Tv Show Season (\d+) ep(\d+)/TV Show S0$2E$3 - $1/' | |
''', | |
epilog=f'renamer v{Renamer.version} - ArabCoders', | |
formatter_class=RawDescriptionHelpFormatter | |
) | |
defaultDir = os.environ.get( | |
'XDG_CONFIG_HOME', os.path.expanduser('~/.config')) | |
dbFile = os.path.join(defaultDir, 'renamer', 'history.sqlite') | |
opt_input = parser.add_argument_group('Input') | |
opt_input.add_argument("rx", type=str, nargs='?', | |
help=f"Regex to be used for renaming.") | |
opt_input.add_argument("inputs", type=str, nargs='*', | |
help=f"Files or paths to rename.", default=[os.getcwd()]) | |
opt_input.add_argument('-f', '--force', action='store_true', | |
help='Allow overwriting existing files.', default=False) | |
opt_input.add_argument('--allow-hidden', action='store_true', | |
help='Allow processing hidden files.', default=False) | |
opt_config = parser.add_argument_group('Configuration') | |
opt_config.add_argument('-V', '--version', action='store_true', | |
help='Show version number.', default=False) | |
opt_config.add_argument('-d', '--db-file', type=str, | |
help='Database file', default=dbFile) | |
opt_output = parser.add_argument_group('Output') | |
opt_output.add_argument('-sfw', '--safe-for-windows', action='store_true', | |
help='Make the final filename safe for Windows OS.', default=False) | |
opt_output.add_argument('-S', '--substitute', type=str, | |
help='Substitute character for invalid characters.', default='') | |
opt_output.add_argument('-n', '--no-action', action='store_true', | |
help='Print what will happen do not take action.', default=False) | |
opt_output.add_argument('-y', '--assume-yes', action='store_true', | |
help='Assume yes for confirmation to proceed.', default=False) | |
opt_log = parser.add_argument_group('Logging') | |
opt_log.add_argument('-v', '--verbose', action='store_true', | |
help='Verbose mode.', default=False) | |
opt_log.add_argument("-L", "--log", type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], | |
help=f"logging level.", default='INFO') | |
rev_grp = parser.add_argument_group('History') | |
rev_grp.add_argument('-r', '--reverse', type=int, | |
help='Reverse the operation using the given id.') | |
rev_grp.add_argument('-l', '--list-history', action='store_true', default=False, | |
help='List operations log.') | |
rev_grp.add_argument('-k', '--view-history', type=int, | |
help='Display information about the operation by id.') | |
rev_grp.add_argument('-D', '--dont-save', action='store_true', default=False, | |
help='Do not save the operation log in db.') | |
args = parser.parse_args() | |
if args.version: | |
print(f"v{Renamer.version}", file=sys.stdout) | |
return 0 | |
args.log = 'DEBUG' if args.verbose else args.log | |
logLevel = getattr(logging, args.log.upper(), None) | |
try: | |
import coloredlogs | |
coloredlogs.install( | |
level=logLevel, fmt='%(asctime)s %(levelname)s: %(message)s', datefmt='%H:%M:%S') | |
except ImportError: | |
logging.basicConfig( | |
level=logLevel, format="%(asctime)s %(levelname)s: %(message)s", datefmt='%H:%M:%S') | |
cls = Renamer( | |
logger=logging.getLogger('Renamer'), | |
db_file=args.db_file | |
) | |
return cls.take_action(**args.__dict__) | |
if __name__ == '__main__': | |
exit(Renamer.cli()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment