-
-
Save philmae/32fd9d91a5f65204b305b3a9e9560fbd to your computer and use it in GitHub Desktop.
Fast duplicate file finder and (optionally) deleter, written in Python 3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
#coding=utf-8 | |
""" | |
Fast duplicate file finder and deleter. | |
Usage: duplicates.py {--delete} <folder> [folder2 ...] | |
Based on https://stackoverflow.com/a/36113168/300783 | |
Modified for Python3 with some small code improvements. | |
# Based on https://gist.github.com/ntjess/1663d25d09bd762af2f0c60f600191f5 | |
# Added duplicate file deletion parameter, sorting and detailed status display | |
# Based on https://gist.github.com/philmae/32fd9d91a5f65204b305b3a9e9560fbd | |
# Added File Size Conversion, cleaned output fit for purpose | |
""" | |
import argparse | |
import hashlib | |
import os | |
from collections import defaultdict | |
def chunk_reader(fobj, chunk_size=1024): | |
# Generator that reads a file in chunks of bytes | |
while True: | |
chunk = fobj.read(chunk_size) | |
if not chunk: | |
return | |
yield chunk | |
def get_hash(filename, first_chunk_only=False, hash_algo=hashlib.sha1): | |
hashobj = hash_algo() | |
with open(filename, "rb") as f: | |
if first_chunk_only: | |
hashobj.update(f.read(1024)) | |
else: | |
for chunk in chunk_reader(f): | |
hashobj.update(chunk) | |
return hashobj.digest() | |
#Converts bytes to readable output | |
def convert_bytes(num): | |
for unit in ['bytes', 'KB', 'MB', 'GB', 'TB','PB']: | |
if abs(num) < 1024.0: | |
return "%3.1f %s" % (num, unit) | |
num /= 1024.0 | |
#Returns File Size | |
def file_size_check(file_path): | |
if os.path.isfile(file_path): | |
file_info = os.stat(file_path) | |
return convert_bytes(file_info.st_size) | |
def check_for_duplicates(paths_list, delete=False): | |
files_by_size = defaultdict(list) | |
files_by_small_hash = defaultdict(list) | |
files_by_full_hash = defaultdict(list) | |
initial_count = 0 | |
initial_size = 0 | |
for path in paths_list: | |
print(f'\nFolder Paths Scanned in {paths_list[0]}') | |
for dirpath, _, filenames in os.walk(path): | |
#Shows which folders scanned | |
print('Scanning %s...' % dirpath) | |
for filename in filenames: | |
full_path = os.path.join(dirpath, filename) | |
file_path = os.path.realpath(full_path) | |
file_size = os.path.getsize(file_path) | |
initial_count += 1 | |
initial_size += file_size | |
files_by_size[file_size].append(file_path) | |
initial_size_output = convert_bytes(initial_size) | |
print(f'\nFiles Counted in Folder(s): {initial_count}') | |
print(f'Files Size Total: {initial_size_output}') | |
c = 0 | |
# For all files with the same file size, get their hash on the first 1024 bytes | |
for file_size,files in files_by_size.items(): | |
if len(files) < 2: | |
c += 1 | |
continue # this file size is unique, no need to spend cpu cycles on it | |
for filename in files: | |
try: | |
small_hash = get_hash(filename, first_chunk_only=True) | |
except OSError: | |
# the file access might've changed till the exec point got here | |
continue | |
files_by_small_hash[(file_size, small_hash)].append(filename) | |
if len(files_by_size.items()) == c: | |
print('No duplicate files found.') | |
else: | |
print('\n --- Duplicates detected --- \n') | |
# For all files with the hash on the first 1024 bytes, get their hash on the full | |
# file - collisions will be duplicates | |
for files in files_by_small_hash.values(): | |
if len(files) < 2: | |
# the hash of the first 1k bytes is unique -> skip this file | |
continue | |
for filename in files: | |
try: | |
full_hash = get_hash(filename, first_chunk_only=False) | |
# Add this file to the list of others sharing the same full hash | |
files_by_full_hash[full_hash].append(filename) | |
except OSError: | |
# the file access might've changed till the exec point got here | |
continue | |
# Get sorted, by hash, list of each files per hash item | |
files_by_full_hash = sorted(files_by_full_hash.items()) | |
# Sort the files list of each hash, by path | |
for file_by_full_hash in files_by_full_hash: | |
file_by_full_hash[1].sort(reverse=True) | |
duplicate_count = sum(len(file[1]) for file in files_by_full_hash) - len(files_by_full_hash) | |
duplicate_nl = '\n' if duplicate_count else '' | |
print(f' Duplicate Files: {duplicate_count}{duplicate_nl}') | |
duplicate_size = 0 | |
if duplicate_count: | |
# Now, print a summary of all files that share a full hash | |
for file_hash,file_list in files_by_full_hash: | |
if len(file_list) < 2: | |
# Only one file, it's unique | |
continue | |
# More than one file share the same full hash | |
files_status = [] | |
for file_index,file_path in enumerate(file_list): | |
file_str = f' {file_path}' | |
if file_index > 0: | |
duplicate_size += os.path.getsize(file_path) | |
if delete: | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
file_str += ' [Deleted]' | |
else: | |
file_str += ' [Error]' | |
else: | |
file_str += ' [Duplicate]' | |
files_status.append(file_str) | |
files_dups = '\n'.join(map(str, files_status)) | |
files_hash = f'{int.from_bytes(file_hash, "big"):040X}' | |
files_nl = '' if files_by_full_hash[-1][0] == file_hash else '\n' | |
print('Duplicate File Set:') | |
for result in file_list[:1]: | |
print("Size of file :", file_size_check(result)) | |
print('___________________') | |
print(f' {files_hash}:\n\n{files_dups}{files_nl}') | |
duplicate_size_output = convert_bytes(duplicate_size) | |
duplicate_ratio = duplicate_count/initial_count if initial_count else 0 | |
size_reduction = duplicate_size/initial_size if initial_size else 0 | |
print(f'\nDuplicate Ratio: {duplicate_count}/{initial_count} ({duplicate_ratio:.2%})') | |
print(f'Size Reduction: {duplicate_size_output}/{initial_size_output} ({size_reduction:.2%})') | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument('folders', nargs='*') | |
parser.add_argument('--delete', help='delete duplicate files', action='store_true') | |
args = parser.parse_args() | |
check_for_duplicates(args.folders, args.delete) | |
print('\nFile Scan Complete') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment