Last active
November 18, 2020 21:35
-
-
Save 0x4E69676874466F78/2015373fc91f5bfd2914140f5bb0ccd6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!C:\Python38-32\python.exe | |
# This is tool for remove duplicate files. | |
# But instead of deleting, it replace duplicates with SYMBOLIC LINK to original file | |
# (it may be useful in Phase Shift music directory) | |
# Algorithm for compare: BLAKE2s | |
__author__ = 'Ugubok, NightFox' | |
from os import path, walk, symlink, remove, sys | |
from hashlib import blake2s | |
import logging | |
log = logging.getLogger(__name__) | |
log.setLevel(logging.DEBUG) | |
err_formatter = logging.Formatter("[%(asctime)s.%(msecs)d] " | |
"%(levelname)s: %(filename)s(%(lineno)d): %(message)s", "%Y-%m-%d %H:%M:%S") | |
dbg_formatter = logging.Formatter("%(asctime)-15s\t%(name)s:%(lineno)-4d\t\t%(levelname)-8s %(message)s") | |
info_formatter = logging.Formatter("[%(asctime)s] %(message)s", "%H:%M:%S") | |
handler = logging.StreamHandler(sys.stdout) | |
handler.setFormatter(info_formatter) | |
handler.setLevel(logging.INFO) | |
log.addHandler(handler) | |
def gethash(fpath, hash_factory=blake2s, chunk_num_blocks=128): | |
h = hash_factory() | |
fsize = 0 | |
UNCPathPrefix = "\\\\?\\" # без \\?\ будет ошибка при длинном пути | |
try: | |
with open(UNCPathPrefix+fpath,'rb') as f: | |
while chunk := f.read(chunk_num_blocks*h.block_size): | |
h.update(chunk) | |
fsize = f.tell() | |
except FileNotFoundError as err: | |
log.error("{0}".format(err)) | |
return h.digest(), fsize | |
def formatsize(b): | |
"""Вывод размера файла по стандарту IEEE 1541/IEC 60027-2""" | |
assert isinstance(b, int) | |
kib = 1024 | |
mib = kib**2 | |
gib = mib*kib | |
tib = gib*kib | |
if b < kib: | |
return '%i bytes' % b | |
elif kib <= b < mib: | |
return '%.2f KiB' % float(b/kib) | |
elif mib <= b < gib: | |
return '%.2f MiB' % float(b/mib) | |
elif gib <= b < tib: | |
return '%.2f GiB' % float(b/gib) | |
elif tib <= b: | |
return '%.2f TiB' % float(b/tib) | |
def main(args): | |
hashcloud = set() # Set of all BLAKE2s hashes | |
origins = dict() # Dict of {blake2s: full-path} | |
bytes_removed = 0 | |
bytes_processed = 0 | |
for root, dirs, files in walk(args.path, True, None, args.followlinks): | |
for fname in files: | |
fullpath = path.join(root, fname) | |
if path.islink(fullpath): | |
continue | |
fhash, fsize = gethash(fullpath) | |
if fsize == 0: | |
continue | |
bytes_processed += fsize | |
if fhash in hashcloud: | |
if not args.simulate: | |
try: | |
remove(fullpath) | |
try: | |
symlink(origins[fhash], fullpath) | |
except Exception as err: | |
log.error("{0}".format(err)) | |
except FileNotFoundError as err: | |
log.error("{0}".format(err)) | |
except PermissionError as err: | |
log.error("{0}".format(err)) | |
bytes_removed += fsize | |
try: | |
log.info(fullpath + " <<===>> " + origins[fhash]) | |
except UnicodeEncodeError: | |
try: | |
s = fullpath + " <<===>> " + origins[fhash] | |
s = s.encode().decode(sys.getdefaultencoding(), 'replace') | |
s = s.replace('\ufffd', '?') | |
log.warning(s) | |
except UnicodeEncodeError as err: | |
log.error("<FAIL_TO_DECODE> <<===>> <FAIL_TO_DECODE>") | |
log.error("{0}".format(err)) | |
else: | |
hashcloud.add(fhash) | |
origins[fhash] = fullpath | |
if not args.recursive: | |
break | |
return bytes_processed, bytes_removed | |
if __name__ == "__main__": | |
import argparse | |
from time import time | |
from datetime import datetime | |
from sys import argv | |
p = argparse.ArgumentParser() | |
p.add_argument("path", help="Destination path (where to search duplicates)", type=str) | |
p.add_argument("-r", "--recursive", help="Walk subdirectories recursively", action="store_true") | |
p.add_argument("-s", "--simulate", help="Do NOT take real actions, only simulate", action="store_true") | |
p.add_argument("-fl", "--followlinks", help="Follow links", action="store_true") | |
p.add_argument("--logfile", help="Log file name", type=str) | |
_args = p.parse_args() | |
if _args.logfile: | |
_args.logfile = datetime.now().strftime(_args.logfile) | |
handler = logging.FileHandler(_args.logfile, 'a', 'utf-8') | |
handler.setFormatter(dbg_formatter) | |
handler.setLevel(logging.DEBUG) | |
log.addHandler(handler) | |
log.debug("\n\n{0}\n+{1}+\n{0}".format("=" * 80, ("%s STARTED" % __file__).center(78))) | |
log.debug("Program args: " + ' '.join(sys.argv[1:])) | |
if not path.exists(_args.path): | |
log.error("Destination doesn't exists!") | |
exit(1) | |
start_time = time() | |
processed, removed = main(_args) | |
end_time = time() | |
dTime = (end_time - start_time) | |
sTime = "%.2f seconds" % dTime | |
if dTime > 60: | |
sTime = "%.2f minutes" % (dTime / 60) | |
log.info("After %s: %s processed, %s replaced by symlinks" % (sTime, formatsize(processed), formatsize(removed))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment