Skip to content

Instantly share code, notes, and snippets.

@arabcoders
Last active February 26, 2024 15:17
Show Gist options
  • Save arabcoders/0422e94f818503130e21aa99b2f50d62 to your computer and use it in GitHub Desktop.
Save arabcoders/0422e94f818503130e21aa99b2f50d62 to your computer and use it in GitHub Desktop.
A Python script to rename files with history support.
#!/usr/bin/env python3
# -*- encoding: UTF8 -*-
#
# Copyright (c) 2024 ArabCoders
# Licensed under the MIT License
#
from argparse import RawDescriptionHelpFormatter
import logging
import sqlite3
import os
import sys
import re
from datetime import datetime
import yaml
import json
from os.path import relpath as rp
class IndentDumper(yaml.Dumper):
def increase_indent(self, flow=False, indentless=False):
return super(IndentDumper, self).increase_indent(flow, False)
class Renamer:
"""
A filename renamer using regular expressions. With the ability to
reverse the changes and save the history.
"""
version: str = '0.1.0'
"Version number."
rgRx: re.Pattern = re.compile(r'(?P<mode>\w{1,2}?)/(?P<rg>(?P<search>.*?)/(?P<replace>.*?)/(?P<flags>\w{0,2}))')
"Regex pattern for the search/replace format."
windowsRX: re.Pattern = re.compile(r"[/\\?%*:|\"<>\x7F\x00-\x1F]")
"Windows invalid characters regex pattern."
db: sqlite3.Connection = None
"History database connection."
logger: logging.Logger = None
"Logger instance."
def __init__(self, logger: logging.Logger, db_file: str) -> None:
"""
Initialize the Renamer.
:param logger: Logger instance.
:param db_file: Database file path.
"""
self.logger = logger
self.initialize_db(db_file=db_file)
def initialize_db(self, db_file: str) -> None:
"""
Initialize the history database.
:param db_file: Database file path.
"""
initialize: bool = False
if not os.path.exists(db_file):
os.makedirs(os.path.dirname(db_file), exist_ok=True)
initialize = True
self.db = sqlite3.connect(db_file)
self.db.row_factory = sqlite3.Row
if not initialize:
return
self.logger.debug(f"Initializing history db '{db_file}'.")
db_commands: list = [
'CREATE TABLE "history" ("id" integer PRIMARY KEY AUTOINCREMENT, "rx" text NOT NULL, "actions" JSON NOT NULL, "total" numeric NOT NULL, "ts" timestamp NOT NULL)',
]
for command in db_commands:
self.logger.debug(f"Executing [{command}].")
self.db.execute(command)
def get_history(self, id: int) -> dict:
"""
Get the history of a given operation.
:param id: History id.
:return: History item.
:throws ValueError: If no history found for the given id.
"""
cursor = self.db.cursor()
cursor.execute('SELECT * from "history" WHERE id = ?', (id,))
row = cursor.fetchone()
if not row:
raise ValueError(f'No history found for id [#{id}].')
item = {
'id': row['id'],
'rx': row['rx'],
'total': int(row['total']),
'ts': datetime.fromtimestamp(row['ts']),
'actions': row['actions'],
}
item['actions'] = json.loads(item['actions'])
return item
def take_action(self, **kwargs) -> int:
"""
Take action based on the given arguments.
:param kwargs: Arguments.
:return: Exit code.
kwargs:
rx: str - Regex pattern.
inputs: list - Files or paths to rename.
force: bool - Allow overwriting existing files.
allow_hidden: bool - Allow processing hidden files.
safe_for_windows: bool - Make the final filename safe for Windows OS.
substitute: str - Substitute character for invalid characters.
no_action: bool - Only print the changes do not execute or save them.
assume_yes: bool - Assume yes for confirmation to proceed.
dont_save: bool - Do not save the operation in the history.
list_history: bool - List history of saved operations.
view_history: int - Display given operation number.
reverse: int - Reverse the given operation number.
"""
if kwargs.get('list_history', False):
return self.list_history(**kwargs)
if kwargs.get('view_history', False):
return self.view_history(id=kwargs.get('view_history', 0), **kwargs)
if kwargs.get('reverse', False):
return self.reverse_history(id=kwargs.get('reverse', 0), **kwargs)
return self.rename_files(**kwargs)
def list_history(self, **kwargs) -> int:
"""
List the history of saved operations.
:param kwargs: Arguments.
:return: Exit code.
"""
cursor = self.db.cursor()
cursor.execute('SELECT "id", "total", "ts" from "history" ORDER BY "id" DESC')
data = []
for row in cursor:
data.append({'id': row['id'], 'total': row['total'], 'ts': row['ts']})
if len(data) < 1:
self.logger.info('No records are found.')
return 0
sep = '-' * 49
print(sep)
print('|' + ' ' * 3 + 'ID' + ' ' * 3 + '| Actions | ' + ' ' * 11 + 'Date' + ' ' * 12 + '|')
print(sep)
for row in data:
id = row['id']
total = row['total']
makeSpace = 6 - int(len(str(id)))
makeSpace2 = 8 - int(len(str(total)))
print(f"| #{id}" + ' ' * makeSpace + f'| {total}' + ' ' * makeSpace2 +
f"| {datetime.fromtimestamp(row['ts'])} |")
if row != data[-1]:
print(sep)
print(sep)
return 0
def view_history(self, id: int, **kwargs) -> int:
"""
View the history of a given operation.
:param id: History id.
:param kwargs: Arguments.
:return: Exit code.
"""
try:
item = self.get_history(id)
except ValueError as e:
self.logger.error(e)
return 1
yaml.dump(data=item, stream=sys.stdout, default_flow_style=False,
Dumper=IndentDumper, indent=2, sort_keys=False)
def reverse_history(self, id: int, **kwargs) -> int:
"""
Reverse the given operation.
:param id: History id.
:param kwargs: Arguments.
:return: Exit code.
:kwargs:
force: bool - Allow overwriting existing files.
"""
try:
item = self.get_history(id)
except ValueError as e:
self.logger.error(e)
return 1
force: bool = bool(kwargs.get('force', False))
actions = []
for action in item['actions']:
if 'success' not in action or action['success'] is False:
self.logger.warning(f"'{rp(action['old'])}' not reversing: previous operation failed.")
continue
if not os.path.exists(action['new']):
self.logger.warning(f"'{rp(action['new'])}' not reversing: does not exist.")
continue
if force is False and os.path.exists(action['old']):
self.logger.warning(f"'{rp(action['new'])}' not reversing: '{rp(action['old'])}' already exists.")
continue
if any(action['old'] == x['new'] for x in actions):
for x in actions:
if action['old'] != x['new']:
continue
self.logger.warning(
f"'{rp(action['new'])}' not reversing: '{rp(x['old'])}' shares the same new name '{rp(action['old'])}'.")
continue
actions.append({'old': action['new'], 'new': action['old']})
if len(actions) < 1:
self.logger.warning("No possible candidates found for reversing.")
return 0
kwargs['rx'] = item['rx']
return self.run(actions=actions, **kwargs)
def rename_files(self, rx: str, inputs: list, **kwargs) -> int:
"""
Rename files based on the given regex.
:param rx: Regex pattern.
:param inputs: Files or paths to rename.
:param kwargs: Arguments.
:return: Exit code.
:kwargs:
force: bool - Allow overwriting existing files.
safe_for_windows: bool - Make the final filename safe for windows.
substitute: str - Substitute character for invalid characters.
allow_hidden: bool - Allow processing hidden files.
no_action: bool - Only print the changes do not execute or save them.
assume_yes: bool - Assume yes for confirmation to proceed.
dont_save: bool - Do not save the operation in the history.
"""
if not rx:
self.logger.error("Please provide a search/replace pattern.")
sys.exit(1)
if not inputs:
self.logger.error("Please provide file or path to take action on.")
sys.exit(1)
force: bool = bool(kwargs.get('force', False))
safeWindows: bool = bool(kwargs.get('safe_for_windows', False))
safeWindowsSubstitute: str = kwargs.get('substitute', '')
allowHidden: bool = bool(kwargs.get('allow_hidden', False))
splitPat = self.rgRx.match(rx)
if not splitPat:
self.logger.error(f"Invalid search/replace pattern was given '{rx}'.")
return 1
rx_d = splitPat.groupdict()
if not rx_d:
print(splitPat.groupdict())
self.logger.error(f"Invalid search/replace pattern was given '{rx}'.")
return 1
if not rx_d.get('search', False):
self.logger.error(
f"Invalid regex pattern was given '{rx}'. No search pattern was given.")
return 1
mode = rx_d.get('mode', None)
if not mode or mode not in ['s', 'S']:
self.logger.error(f"Invalid mode was given '{mode}' only the search mode 's/' is supported.")
return 1
rx_flags = 0
flags = rx_d.get('flags', None)
globalMode = False
if flags:
# split the flags and apply them to the regex pattern.
for flag in flags:
if flag not in ['i', 'g', 'm', 's', 'x', 'A', 'U', 'L', 'M', 'S', 'X']:
self.logger.error(f"Invalid regular expression flag was given '{flag}'.")
return 1
if flag == 'g':
globalMode = True
continue
rx_flags |= getattr(re, flag.upper())
try:
rx_p = re.compile(rx_d.get('search'), flags=rx_flags)
except re.error as e:
self.logger.error(f"Invalid search pattern was given '{rx_d.get('search')}'. {e}")
return 1
rx_r = rx_d.get('replace', '')
if rx_r:
# replace $N with \N
rx_r = re.sub(r'(?<!\\)\$(\d{1,2})', r'\\\1', rx_r)
files: list = []
for path in inputs:
if not os.path.exists(path):
self.logger.error(f"'{path}' does not exist.")
sys.exit(1)
for f in self.get_files(path, allow_hidden=allowHidden):
files.append(f)
actions: list = []
for file in files:
file_path = os.path.dirname(file)
file_name = os.path.basename(file)
if globalMode is False and not rx_p.search(file_name):
self.logger.debug(f"'{file_name}' not renamed: does not match the regex.")
continue
if globalMode is True and not rx_p.findall(file_name):
self.logger.debug(f"'{file_name}' not renamed: does not match the regex.")
continue
new_name = rx_p.sub(rx_r, file_name)
if safeWindows:
new_name = self.safe_windows_filename(filename=new_name, substitute=safeWindowsSubstitute)
new_name = os.path.join(file_path, new_name)
if force is False and os.path.exists(new_name):
self.logger.warning(f"'{rp(file_name)}' not renamed: '{rp(new_name)}' already exists.")
continue
if any(new_name == x['new'] for x in actions):
for x in actions:
if new_name == x['new']:
self.logger.warning(
f"'{rp(file_name)}' not renamed: '{rp(x['old'])}' shares the same new name '{rp(new_name)}'.")
continue
if new_name == file:
self.logger.debug(f"'{file_name}' not renamed: already matches.")
continue
actions.append({'old': file, 'new': new_name})
if len(actions) < 1:
self.logger.warning(f"No possible candidates found for renaming that matches '{rx_d.get('search')}'.")
return 0
return self.run(rx=rx, actions=actions, **kwargs)
def run(self, actions: list, **kwargs) -> int:
"""
Run the renaming and save the history.
:param actions: List of actions.
:param kwargs: Arguments.
:return: Exit code.
:kwargs:
rx: str - Regex pattern.
no_action: bool - Only print the changes do not execute or save them.
assume_yes: bool - Assume yes for confirmation to proceed.
dont_save: bool - Do not save the operation in the history.
"""
rx: str = kwargs.get('rx')
noAction: bool = bool(kwargs.get('no_action', False))
assumeYes: bool = bool(kwargs.get('assume_yes', False))
dontSave: bool = bool(kwargs.get('dont_save', False))
if assumeYes is False or noAction is True:
for rec in actions:
print(f"Renaming '{rp(rec['old'])}' => '{rp(rec['new'])}'.")
if noAction:
return 0
if assumeYes is False and not self.confirm():
return 0
for item in actions:
try:
os.rename(item['old'], item['new'])
self.logger.info(f"'{rp(item['old'])}' renamed: '{rp(item['new'])}'.")
item['success'] = True
except Exception as e:
item['error'] = str(e)
item['success'] = False
self.logger.error(f"'{rp(item['old'])}' not renamed: {e}")
if dontSave is False:
cursor = self.db.cursor()
cursor.execute(
'INSERT INTO "history" ("rx", "actions", "total", "ts") VALUES (?, ?, ?, ?)',
(rx, json.dumps(actions, ensure_ascii=False), len(actions), datetime.now().timestamp(),)
)
self.db.commit()
self.logger.info(f"Operation log saved as id [#{cursor.lastrowid}].")
return 0
def get_files(self, path: str, allow_hidden: bool = False) -> list:
"""
Get the list of files in the given path.
:param path: Path to search for files.
:param allow_hidden: Allow processing hidden files.
:return: List of files.
"""
if os.path.isfile(path):
return [os.path.realpath(path)]
files = []
for file in os.listdir(path):
FullName = os.path.realpath(os.path.join(path, file))
if not os.path.isfile(FullName):
continue
if allow_hidden is False and file.startswith('.'):
continue
files.append(FullName)
def natural_sort(l):
def convert(text): return int(text) if text.isdigit() else text.lower()
def alphanum_key(key): return [convert(c) for c in re.split('([0-9]+)', key)]
return sorted(l, key=alphanum_key)
return natural_sort(files)
def confirm(self) -> bool:
"""
Confirm the action to proceed.
:return: True if confirmed, False otherwise.
"""
try:
confirm = input("Continue? [y/n]: ")
confirm = confirm.lower()
if 'y' != confirm and 'n' != confirm:
print("\n Invalid option was entered. Please enter 'y' or 'n' to proceed.")
return self.confirm()
return True if 'y' == confirm else False
except KeyboardInterrupt:
return False
def safe_windows_filename(self, filename: str, substitute: str = '') -> str:
"""
Make the filename safe for windows.
We may want to check for special filenames such as 'CON', 'PRN', 'AUX', 'NUL', 'COM1', 'COM2', 'COM3', 'COM4',
in the future. if we want to make it more strict.
:param filename: Filename to make safe.
:param substitute: Substitute character for invalid characters.
:return: Safe filename.
"""
return re.sub(r"[/\\?%*:|\"<>\x7F\x00-\x1F]", substitute, filename)
@staticmethod
def cli() -> int:
"""
CLI interface for the class.
:return: Exit code.
"""
import argparse
parser = argparse.ArgumentParser(
description='''Filenames Renamer using regular expressions.
The pattern is in the format of 'MODE/SEARCH/REPLACE/FLAGS'.
The supported modes are:
- s: Search and replace mode.
The flags are optional and can be any combination of the following:
- i: case-insensitive.
- g: global match.
- m: multi-line match.
- s: dot matches newline.
- x: free-spacing.
- A: ASCII-only matching.
- U: Unicode matching.
- L: Locale dependent matching.
- M: Multi-line mode.
- S: DOTALL mode.
- X: VERBOSE mode.
The search pattern is a regular expression pattern it uses the Python regular expression syntax.
For the replace pattern, you can use the following special sequences:
- \g<name>: The substring matched by the group named name.
- \g<number>: The substring matched by the group number.
- \g<N>: The substring matched by the N group.
- \\N: The substring matched by the N match.
- $N: The substring matched by the N match. < this one is special will be replaced with \\N.
** Note: the dollar sign $ is special case. and if you want to use it as part of the replace pattern you need to escape it with a backslash.
Examples:
- renamer 's/\.txt/\.md/' file1.txt file2.txt
- renamer -sfw 's/(\d+) Tv Show Season (\d+) ep(\d+)/TV Show S0$2E$3 - $1/'
''',
epilog=f'renamer v{Renamer.version} - ArabCoders',
formatter_class=RawDescriptionHelpFormatter
)
defaultDir = os.environ.get(
'XDG_CONFIG_HOME', os.path.expanduser('~/.config'))
dbFile = os.path.join(defaultDir, 'renamer', 'history.sqlite')
opt_input = parser.add_argument_group('Input')
opt_input.add_argument("rx", type=str, nargs='?',
help=f"Regex to be used for renaming.")
opt_input.add_argument("inputs", type=str, nargs='*',
help=f"Files or paths to rename.", default=[os.getcwd()])
opt_input.add_argument('-f', '--force', action='store_true',
help='Allow overwriting existing files.', default=False)
opt_input.add_argument('--allow-hidden', action='store_true',
help='Allow processing hidden files.', default=False)
opt_config = parser.add_argument_group('Configuration')
opt_config.add_argument('-V', '--version', action='store_true',
help='Show version number.', default=False)
opt_config.add_argument('-d', '--db-file', type=str,
help='Database file', default=dbFile)
opt_output = parser.add_argument_group('Output')
opt_output.add_argument('-sfw', '--safe-for-windows', action='store_true',
help='Make the final filename safe for Windows OS.', default=False)
opt_output.add_argument('-S', '--substitute', type=str,
help='Substitute character for invalid characters.', default='')
opt_output.add_argument('-n', '--no-action', action='store_true',
help='Print what will happen do not take action.', default=False)
opt_output.add_argument('-y', '--assume-yes', action='store_true',
help='Assume yes for confirmation to proceed.', default=False)
opt_log = parser.add_argument_group('Logging')
opt_log.add_argument('-v', '--verbose', action='store_true',
help='Verbose mode.', default=False)
opt_log.add_argument("-L", "--log", type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
help=f"logging level.", default='INFO')
rev_grp = parser.add_argument_group('History')
rev_grp.add_argument('-r', '--reverse', type=int,
help='Reverse the operation using the given id.')
rev_grp.add_argument('-l', '--list-history', action='store_true', default=False,
help='List operations log.')
rev_grp.add_argument('-k', '--view-history', type=int,
help='Display information about the operation by id.')
rev_grp.add_argument('-D', '--dont-save', action='store_true', default=False,
help='Do not save the operation log in db.')
args = parser.parse_args()
if args.version:
print(f"v{Renamer.version}", file=sys.stdout)
return 0
args.log = 'DEBUG' if args.verbose else args.log
logLevel = getattr(logging, args.log.upper(), None)
try:
import coloredlogs
coloredlogs.install(
level=logLevel, fmt='%(asctime)s %(levelname)s: %(message)s', datefmt='%H:%M:%S')
except ImportError:
logging.basicConfig(
level=logLevel, format="%(asctime)s %(levelname)s: %(message)s", datefmt='%H:%M:%S')
cls = Renamer(
logger=logging.getLogger('Renamer'),
db_file=args.db_file
)
return cls.take_action(**args.__dict__)
if __name__ == '__main__':
exit(Renamer.cli())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment