Created
October 16, 2024 15:34
-
-
Save RenaKunisaki/b0825d33a20ee384d70692f48a95b4d3 to your computer and use it in GitHub Desktop.
Python script for deduplicating files/moving non-duplicate files by hash
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""File deduplication tool | |
by Rena Kunisaki, 2024 Oct 14 | |
Performs two functions: | |
--move fromPath toPath: Moves files from fromPath to toPath, if not already | |
present in toPath. Files are compared by hash, not name, so this skips | |
already-present files even if they have a different name. | |
--dedupe path: Scans for duplicate files in path. For each file that appears | |
more than once, prompts the user to keep one copy and removes the others. | |
Neither option will recurse into subdirectories. | |
""" | |
import hashlib | |
from pathlib import Path | |
from os import PathLike | |
import os | |
import sys | |
import shutil | |
import json | |
def getch() -> str: | |
"""Read one character from interactive stdin. | |
Probably doesn't work on Windows. | |
""" | |
import sys, termios | |
fd = sys.stdin.fileno() | |
oldAttr = termios.tcgetattr(fd) | |
newAttr = termios.tcgetattr(fd) | |
newAttr[3] = newAttr[3] & ~termios.ICANON | |
newAttr[6][termios.VMIN] = 1 | |
newAttr[6][termios.VTIME] = 0 | |
try: | |
termios.tcsetattr(fd, termios.TCSAFLUSH, newAttr) | |
return sys.stdin.read(1) | |
finally: | |
termios.tcsetattr(fd, termios.TCSAFLUSH, oldAttr) | |
def getFileHash(path: PathLike) -> str: | |
"""Return the MD5 hash of this file, as hex string.""" | |
with open(path, "rb") as file: | |
hash = hashlib.md5() | |
while chunk := file.read(8192): | |
hash.update(chunk) | |
return hash.hexdigest() | |
def getAllFileHashes(dirPath: PathLike) -> dict: | |
"""Return all MD5 hashes in this directory (non-recursive). | |
Returns a dict with two items: | |
names: dict of name => hash of this file | |
hashes: dict of hash => list of names that have this hash | |
""" | |
dirPath = Path(dirPath) | |
names = {} # name => hash | |
hashes = {} # hash => [name, name...] | |
dirList = os.listdir(dirPath) | |
nFiles = len(dirList) | |
print("") | |
for i, name in enumerate(dirList): | |
path = dirPath / name | |
# \r: return to start of line | |
# \x1B[2K: clear line | |
# print percentage plus name, truncated to avoid wrapping | |
print("\r\x1B[2KScanning [%3d%%] %s" % ( | |
(i/nFiles)*100, str(path)[:60]), end='') | |
if os.path.isfile(path): | |
hash = getFileHash(path) | |
names[name] = hash | |
if hash not in hashes: hashes[hash] = [] | |
hashes[hash].append(name) | |
print("") | |
return { | |
'names': names, | |
'hashes': hashes, | |
} | |
def reportDupeFiles(files: dict) -> None: | |
"""Output a JSON file listing all hashes that belong | |
to more than one file, and the names of those files. | |
""" | |
hashes = files['hashes'] | |
result = {} | |
for hash, names in hashes.items(): | |
if len(names) > 1: | |
result[hash] = names | |
print(json.dumps(result)) | |
def moveNonDupeFiles(fromPath: PathLike, toPath: PathLike) -> None: | |
"""Move files from fromPath to toPath, skipping any that are | |
already present in toPath, renaming as needed to avoid collisions. | |
Skipping already-present files is done by checking hashes, not names. | |
""" | |
fromPath = Path(fromPath) | |
toPath = Path(toPath) | |
files = getAllFileHashes(toPath) | |
names = files['names'] | |
hashes = files['hashes'] | |
for name in os.listdir(fromPath): | |
path = fromPath / name | |
if os.path.isfile(path): | |
#print("Checking:", path) | |
hash = getFileHash(path) | |
if hash in hashes: | |
print("Already have:", path) | |
continue | |
if name in names: | |
newName = name | |
suffix = 1 | |
while newName in names: | |
p, e = os.path.splitext(name) | |
if e.startswith('.'): e = e[1:] | |
newName = '%s.%d.%s' % (p, suffix, e) | |
suffix = suffix + 1 | |
print("Name collision:", path, "=>", newName) | |
name = newName | |
if newName not in names: | |
names[newName] = hash | |
print("Moving:", path) | |
shutil.move(path, toPath / name) | |
#reportDupeFiles(files) | |
def promptRemoveDupeFile(dirPath: PathLike, names: list[str]) -> None: | |
"""Print a list of duplicate files and ask which to keep. | |
Removes the ones not selected. | |
""" | |
dirPath = Path(dirPath) | |
selection = None | |
print("Keep which name?") | |
for i, name in enumerate(names): | |
print(i+1, name) | |
while True: | |
if len(names) < 10: | |
# only need a single character | |
selection = getch() | |
print('') # move to next line | |
else: | |
# need multiple characters plus Enter | |
selection = input("> ") | |
try: # parse and validate selection | |
selection = int(selection) - 1 | |
if selection >= 0 and selection < len(names): | |
break | |
except (TypeError, ValueError): | |
continue | |
for i, name in enumerate(names): | |
if i != selection: | |
#print("remove", name) | |
os.remove(dirPath / name) | |
def interactiveRemoveDupeFiles(dirPath: PathLike) -> None: | |
"""Check for duplicate files in this directory. | |
Prompt the user for which to keep from each duplicate set. | |
""" | |
dirPath = Path(dirPath) | |
files = getAllFileHashes(dirPath) | |
hashes = files['hashes'] | |
for hash, names in hashes.items(): | |
if len(names) > 1: | |
promptRemoveDupeFile(dirPath, names) | |
def showUsageAndExit(): | |
print("Usage: ") | |
print("%s --move fromPath toPath" % sys.argv[0]) | |
print(" Move files from fromPath to toPath; skip files\n" | |
" which already have a copy in toPath (even if\n" | |
" named differently).") | |
print("%s --dedupe path" % sys.argv[0]) | |
print(" Interactively remove duplicate files in path.") | |
sys.exit(1) | |
def main(*arg): | |
if len(arg) < 1: showUsageAndExit() | |
mode = arg[0] | |
if mode == '--move' and len(arg) == 3: | |
moveNonDupeFiles(arg[1], arg[2]) | |
elif mode == '--dedupe' and len(arg) == 2: | |
interactiveRemoveDupeFiles(arg[1]) | |
else: | |
showUsageAndExit() | |
if __name__ == '__main__': | |
main(*sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment