-
-
Save platomav/48663c289a1858f140869792357f6fa8 to your computer and use it in GitHub Desktop.
Fast duplicate file/link finder (and optionally deleter), for Posix/NT, written in Python 3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# coding=utf-8 | |
# pylint: disable=C0301,R0902,R0903,R0913,W0703 | |
""" | |
Fast duplicate file/link finder (and deleter) | |
Usage: duplicates.py [-h] [-d] [-a HASH_ALGORITHM] [-c CHUNK_SIZE] [-m MAX_SIZE] [-l LOG_LEVEL] [paths ...] | |
Based on https://stackoverflow.com/a/36113168/300783 by Todor Minakov | |
Based on https://gist.github.com/ntjess/1663d25d09bd762af2f0c60f600191f5 by Nathan Jessurun | |
https://gist.github.com/platomav/48663c289a1858f140869792357f6fa8 by Plato Mavropoulos | |
Added duplicate link detection, duplicate deletion option, sorted results, detailed status, | |
message logging, object-oriented structure, variable chunk size and hash algorithm inputs. | |
""" | |
import getpass | |
import hashlib | |
import os | |
import stat | |
import subprocess | |
from argparse import ArgumentParser, Namespace | |
from collections import defaultdict | |
from logging import getLogger, INFO, Logger, StreamHandler | |
class Duplicates: | |
""" Fast duplicate file/link finder (and deleter) """ | |
def __init__(self, in_paths: list | None = None, is_delete: bool | None = None, hash_alg: str | None = None, | |
chunk_size: int | None = None, max_size: int | None = None, log_level: int | None = None, | |
log_logger: Logger | None = None) -> None: | |
_log_level: int = log_level if isinstance(log_level, int) else INFO | |
if isinstance(log_logger, Logger): | |
self.logger: Logger = log_logger | |
self.logger.setLevel(_log_level) | |
else: | |
self.logger = getLogger(__name__) | |
self.logger.setLevel(_log_level) | |
self.logger_handler: StreamHandler = StreamHandler() | |
self.logger_handler.setLevel(self.logger.getEffectiveLevel()) | |
self.logger.addHandler(self.logger_handler) | |
self.check_paths: list = in_paths if isinstance(in_paths, list) and in_paths else [] | |
self.is_delete: bool = is_delete if isinstance(is_delete, bool) else False | |
self.hash_algorithm: str = str(hash_alg) if hash_alg in hashlib.algorithms_guaranteed else 'sha256' | |
self.chunk_size: int = chunk_size if isinstance(chunk_size, int) and chunk_size > 0 else 1024 | |
self.max_size: int = max_size if isinstance(max_size, int) and max_size > 0 else 0x200000000 | |
self.follow_symlinks_false: bool = os.access not in os.supports_follow_symlinks | |
self.initial_size: int = 0 | |
self.initial_count: int = 0 | |
self.duplicate_size: int = 0 | |
self.duplicate_count: int = 0 | |
self.files_by_size: dict = defaultdict(list) | |
self.files_by_full_hash: dict = defaultdict(list) | |
self.files_by_partial_hash: dict = defaultdict(list) | |
self.duplicate_ratio: str = '' | |
self.size_reduction: str = '' | |
def _process_input_paths(self) -> None: | |
""" Process input files/links/directories """ | |
for check_path in self.check_paths: | |
check_path_abs: str = os.path.abspath(check_path) | |
if os.path.isdir(check_path_abs): | |
# noinspection PyArgumentEqualDefault | |
for root_path, _, file_names in os.walk(check_path_abs, followlinks=False): | |
for file_name in file_names: | |
self._get_input_info(input_path=os.path.join(root_path, file_name)) | |
elif self._is_valid_path(input_path=check_path_abs, allow_broken_links=True): | |
self._get_input_info(input_path=check_path_abs) | |
else: | |
self.logger.error('Input path is neither file/link nor directory: %s', check_path_abs) | |
# noinspection PyTypeChecker | |
def _process_duplicate_paths(self) -> None: | |
""" Delete duplicates (if chosen) and show a summary of all processed files """ | |
for file_hash, file_list in sorted(self.files_by_full_hash.items()): | |
file_list_len: int = len(file_list) | |
if file_list_len >= 2: | |
self.logger.info('Found %d files with hash %s', file_list_len, file_hash) | |
for file_index, file_path in enumerate(sorted(file_list, key=lambda fp: (len(fp), fp))): | |
if file_index == 0: | |
self.logger.info('%s [Original]', file_path) | |
else: | |
if os.path.islink(file_path): | |
self.duplicate_size += len(self._get_link_data(link_path=file_path)) | |
else: | |
self.duplicate_size += os.path.getsize(file_path) | |
if self.is_delete: | |
self._delete_path(input_path=file_path) | |
else: | |
self.logger.info('%s [Duplicate]', file_path) | |
def _get_input_hash(self, input_path: str, first_chunk: bool = False) -> str: | |
""" Calculate input hash, first chunk only or entire contents """ | |
hash_object = getattr(hashlib, self.hash_algorithm)() | |
if self._is_path_accessible(input_path=input_path, fix_access=True, allow_links=True): | |
if first_chunk: | |
if os.path.islink(input_path): | |
hash_object.update(self._get_link_data(link_path=input_path)[:self.chunk_size]) | |
else: | |
with open(input_path, 'rb') as file_object: | |
hash_object.update(file_object.read(self.chunk_size)) | |
else: | |
if os.path.islink(input_path): | |
hash_object.update(self._get_link_data(link_path=input_path)) | |
else: | |
with open(input_path, 'rb') as file_object: | |
hash_object.update(file_object.read()) | |
return hash_object.hexdigest().upper() | |
def _get_input_info(self, input_path: str) -> None: | |
""" Get input information (size, count) """ | |
if self._is_path_accessible(input_path=input_path, fix_access=True, allow_links=True): | |
if os.path.islink(input_path): | |
file_size: int = len(self._get_link_data(link_path=input_path)) | |
else: | |
file_size = os.path.getsize(input_path) | |
self.initial_count += 1 | |
self.initial_size += file_size | |
self.files_by_size[file_size].append(input_path) | |
def _get_hashes_partial(self) -> None: | |
""" For all same size files, get their first data chunk partial hash """ | |
for file_size, file_paths in self.files_by_size.items(): | |
if len(file_paths) >= 2: | |
for file_path in file_paths: | |
hash_partial: str = self._get_input_hash(input_path=file_path, first_chunk=True) | |
# Add this file to the list of others sharing the same partial hash | |
self.files_by_partial_hash[(file_size, hash_partial)].append(file_path) | |
def _get_hashes_full(self) -> None: | |
""" For all same partial hash files, get their full data hash (collisions are duplicates) """ | |
for file_paths in self.files_by_partial_hash.values(): | |
if len(file_paths) >= 2: | |
for file_path in file_paths: | |
# noinspection PyArgumentEqualDefault | |
hash_full: str = self._get_input_hash(input_path=file_path, first_chunk=False) | |
# Add this file to the list of others sharing the same full hash | |
self.files_by_full_hash[hash_full].append(file_path) | |
def _is_path_accessible(self, input_path: str, fix_access: bool = False, allow_links: bool = False) -> bool: | |
""" Check if file/link is accessible, otherwise attempt to fix access """ | |
input_path_abs: str = os.path.abspath(input_path) | |
try: | |
# Check (and optionally fix) input path read access, no-dereference logic (when applicable) | |
if not os.access(path=input_path_abs, mode=os.R_OK, follow_symlinks=self.follow_symlinks_false): | |
if fix_access: | |
self._fix_path_access(input_path=input_path_abs) | |
# Check again for input path read access, no-dereference logic (when applicable) | |
if not os.access(path=input_path_abs, mode=os.R_OK, follow_symlinks=self.follow_symlinks_false): | |
raise OSError('Path is not read accessible, access fix attempted!') | |
else: | |
raise OSError('Path is not read accessible, access fix disabled!') | |
# Check that input is not a symlink, when ignored | |
if not allow_links and os.path.islink(input_path_abs): | |
raise OSError('Path is a symbolic link!') | |
# Check that input is a valid file or (broken, when allowed) symlink | |
if not self._is_valid_path(input_path=input_path_abs, allow_broken_links=allow_links): | |
raise OSError('Path is not a file!') | |
# Check that input file is not too large (e.g. > 8GB), when applicable (i.e. non-link) | |
if not os.path.islink(input_path_abs) and os.path.getsize(input_path_abs) > self.max_size: | |
raise OSError(f'File is larger than {self._get_bytes_str(self.max_size)}!') | |
return True | |
except Exception as exception: | |
self.logger.error('Failure while trying to access file %s: %s', input_path_abs, exception) | |
return False | |
def _fix_path_access(self, input_path: str) -> None: | |
""" Attempt to fix path ownership and permissions under Posix/NT """ | |
input_path_abs: str = os.path.abspath(input_path) | |
current_user: str = self._get_current_user() | |
os_platform: str = os.name | |
call_args: dict = {'shell': True, 'stdout': subprocess.DEVNULL, 'stderr': subprocess.DEVNULL} | |
try: | |
if os_platform == 'posix': | |
# Change Posix path ownership to current user, no-dereference logic | |
_ = subprocess.call([f'chown -hfR {current_user}:{current_user} "{input_path_abs}"'], **call_args) | |
# Change Posix path permissions to allow Read/Write | |
_ = subprocess.call([f'chmod -fR +rw "{input_path_abs}"'], **call_args) | |
elif os_platform == 'nt': | |
# Change NT path ownership to current user as well as permissions to allow Read/Write | |
_ = subprocess.call(['icacls', input_path_abs, '/grant', f'{current_user}:(OI)(CI)RW'], **call_args) | |
# Remove NT FILE_ATTRIBUTE_READ_ONLY | |
os.chmod(input_path_abs, stat.S_IWRITE) | |
else: | |
raise OSError(f'Unknown OS platform: "{os_platform}"!') | |
except Exception as exception: | |
self.logger.error('Failed to fix access of path %s: %s', input_path_abs, exception) | |
def _delete_path(self, input_path: str) -> None: | |
""" Delete path, if possible """ | |
if self._is_valid_path(input_path=input_path, allow_broken_links=True): | |
# Check fix input path write access, no-dereference logic (when applicable) | |
if not os.access(path=input_path, mode=os.W_OK, follow_symlinks=self.follow_symlinks_false): | |
self._fix_path_access(input_path=input_path) | |
os.remove(input_path) | |
self.logger.info('%s [Deleted]', input_path) | |
else: | |
self.logger.error('%s [Error]', input_path) | |
@staticmethod | |
def _is_valid_path(input_path: str, allow_broken_links: bool = False) -> bool: | |
""" Check if path is a regular file or symlink (valid or broken) """ | |
input_path_abs: str = os.path.abspath(input_path) | |
if os.path.lexists(input_path_abs): | |
if not os.path.isdir(input_path_abs): | |
if allow_broken_links: | |
return os.path.isfile(input_path_abs) or os.path.islink(input_path_abs) | |
return os.path.isfile(input_path_abs) | |
return False | |
@staticmethod | |
def _get_link_data(link_path: str) -> bytes: | |
""" Get the actual target path of a symlink """ | |
# noinspection PyArgumentEqualDefault | |
return os.readlink(link_path).encode(encoding='utf-8', errors='replace') | |
@staticmethod | |
def _get_dict_val_sum(input_dict: dict) -> int: | |
""" Get sum of all list items within the values of a dictionary """ | |
return sum(len(values) for values in input_dict.values()) | |
@staticmethod | |
def _get_percent_str(part: int, whole: int) -> str: | |
""" Calculate percentage between "part" and "whole" values """ | |
return f'{part / whole if whole else 0:.2%}' | |
@staticmethod | |
def _get_bytes_str(bytes_count: int | float) -> str: | |
""" Append size measurement unit to bytes value """ | |
for bytes_unit in ('bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'): | |
if bytes_count < 1024: | |
break | |
bytes_count /= 1024 | |
return f'{bytes_count:.1f} {bytes_unit}' | |
@staticmethod | |
def _get_current_user() -> str: | |
""" Get current user from terminal or system """ | |
try: | |
current_user: str = os.getlogin() | |
except OSError: | |
current_user = getpass.getuser() | |
return current_user | |
def check_duplicates(self) -> None: | |
""" Check for duplicate files/links at input paths """ | |
self.logger.info('Checking for duplicate files...') | |
self._process_input_paths() | |
self.logger.info('Files count: %d', self.initial_count) | |
self.logger.info('Files size: %s', self._get_bytes_str(self.initial_size)) | |
self._get_hashes_partial() | |
self._get_hashes_full() | |
self.duplicate_count = self._get_dict_val_sum(input_dict=self.files_by_full_hash) - len(self.files_by_full_hash) | |
self.logger.info('Duplicate files: %d', self.duplicate_count) | |
self._process_duplicate_paths() | |
self.duplicate_ratio = self._get_percent_str(part=self.duplicate_count, whole=self.initial_count) | |
self.logger.info('Duplicate ratio: %d / %d files (%s)', self.duplicate_count, | |
self.initial_count, self.duplicate_ratio) | |
self.size_reduction = self._get_percent_str(part=self.duplicate_size, whole=self.initial_size) | |
self.logger.info('Size reduction: %s / %s (%s)', self._get_bytes_str(self.duplicate_size), | |
self._get_bytes_str(self.initial_size), self.size_reduction) | |
self.logger.info('Finished checking for duplicate files!') | |
if __name__ == "__main__": | |
parser: ArgumentParser = ArgumentParser() | |
parser.add_argument('paths', nargs='*') | |
parser.add_argument('-d', '--delete', help='delete duplicate files', action='store_true') | |
parser.add_argument('-a', '--hash-algorithm', help='file detection hash algorithm', type=str) | |
parser.add_argument('-c', '--chunk-size', help='file checking chunk size', type=int) | |
parser.add_argument('-m', '--max-size', help='file checking maximum size', type=int) | |
parser.add_argument('-l', '--log-level', help='message logging level', type=int) | |
arguments: Namespace = parser.parse_args() | |
if arguments.paths: | |
Duplicates(in_paths=arguments.paths, is_delete=arguments.delete, | |
hash_alg=arguments.hash_algorithm, chunk_size=arguments.chunk_size, | |
max_size=arguments.max_size, log_level=arguments.log_level).check_duplicates() | |
else: | |
parser.print_help() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment