Created
October 26, 2023 09:38
-
-
Save cmpute/300b10ebaaf84b825e0536030524c493 to your computer and use it in GitHub Desktop.
File deduplication
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys, os, binascii | |
from imohash import hashfile | |
from pathlib import Path | |
from tqdm import tqdm | |
from collections import defaultdict | |
from time import time | |
from hashlib import md5 | |
USE_HASH = True | |
PROFILE = False | |
# for demonstrating hash performance | |
def hashfullfile(file: Path): | |
return md5(file.read_bytes()).digest | |
def dedup_inplace(dir: Path): | |
database = [] | |
dir_count = 0 | |
name_map = defaultdict(list) | |
fhash_map = defaultdict(list) | |
# collect file info | |
try: | |
t_start = time() | |
t_prev = time() | |
file_count = 0 | |
terminated = False | |
for path, dirs, files in tqdm(os.walk(dir), leave=False): | |
if terminated: | |
break | |
dir_count += 1 | |
for file in files: | |
file = dir / path / file | |
file_idx = len(database) | |
database.append(file) | |
if USE_HASH: | |
try: | |
fhash_map[hashfile(file)].append(file_idx) | |
except KeyboardInterrupt: | |
tqdm.write("Terminated.") | |
terminated = True | |
break | |
except: | |
tqdm.write("Error occured in reading " + str(file)) | |
else: | |
name_map[file.name].append(file_idx) | |
if PROFILE: | |
file_count += 1 | |
t_proc = time() - t_prev | |
t_proc_total = time() - t_start | |
tqdm.write("Processed in %f secs (%f avg)" % (t_proc, t_proc_total / file_count)) | |
t_prev = time() | |
except: | |
print("Execution terminated!") | |
finally: | |
pass | |
# report duplicates | |
print("Total %d directories" % dir_count) | |
map_to_use = fhash_map if USE_HASH else name_map | |
for name, file_idxs in map_to_use.items(): | |
if len(file_idxs) <= 1: | |
continue | |
if USE_HASH: | |
name = binascii.hexlify(name).decode() | |
print("----------") | |
print("Duplicate:", name) | |
for idx in file_idxs: | |
print("\t" + str(database[idx])) | |
def dedup_with_target(dedup_dir: Path, base_dir: Path): | |
src_database = [] | |
dst_database = [] | |
dir_count = 0 | |
name_map = defaultdict(list) | |
fhash_map = defaultdict(list) | |
# collect file info from source directory | |
for path, dirs, files in tqdm(os.walk(base_dir), leave=False): | |
dir_count += 1 | |
for file in files: | |
file = base_dir / path / file | |
file_idx = len(src_database) | |
src_database.append(file) | |
if USE_HASH: | |
try: | |
fhash_map[hashfile(file)].append((False, file_idx)) | |
except: | |
print("Error occured in reading", str(file)) | |
else: | |
name_map[file.name].append((False, file_idx)) | |
# collect file info from target directory | |
try: | |
for path, dirs, files in tqdm(os.walk(dedup_dir), leave=False): | |
dir_count += 1 | |
for file in files: | |
file = dedup_dir / path / file | |
file_idx = len(dst_database) | |
dst_database.append(file) | |
if USE_HASH: | |
fhash_map[hashfile(file)].append((True, file_idx)) | |
else: | |
name_map[file.name].append((True, file_idx)) | |
except: | |
print("Execution terminated!") | |
finally: | |
pass | |
# report duplicates | |
print("Total %d directories" % dir_count) | |
map_to_use = fhash_map if USE_HASH else name_map | |
for name, file_idxs in map_to_use.items(): | |
if len(file_idxs) <= 1: | |
continue | |
if all(in_dst == file_idxs[0][0] for in_dst, _ in file_idxs): | |
# skip inplace duplicates | |
continue | |
if USE_HASH: | |
name = binascii.hexlify(name).decode() | |
print("----------") | |
print("Duplicate:", name) | |
for in_dst, idx in file_idxs: | |
print(("\t+ " + str(dst_database[idx])) if in_dst else | |
("\t- " + str(src_database[idx]))) | |
if __name__ == "__main__": | |
if len(sys.argv) == 1: | |
print("Please provide paths input: python dedup.py <dedup directory> [target directory]") | |
elif len(sys.argv) == 2: | |
dedup_inplace(Path(sys.argv[1])) | |
elif len(sys.argv) == 3: | |
dedup_with_target(Path(sys.argv[1]), Path(sys.argv[2])) | |
else: | |
print("Unrecongized input!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment