Skip to content

Instantly share code, notes, and snippets.

@0x4E69676874466F78
Last active November 18, 2020 21:35
Show Gist options
  • Save 0x4E69676874466F78/2015373fc91f5bfd2914140f5bb0ccd6 to your computer and use it in GitHub Desktop.
Save 0x4E69676874466F78/2015373fc91f5bfd2914140f5bb0ccd6 to your computer and use it in GitHub Desktop.
#!C:\Python38-32\python.exe
# This is tool for remove duplicate files.
# But instead of deleting, it replace duplicates with SYMBOLIC LINK to original file
# (it may be useful in Phase Shift music directory)
# Algorithm for compare: BLAKE2s
__author__ = 'Ugubok, NightFox'
from os import path, walk, symlink, remove, sys
from hashlib import blake2s
import logging
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
err_formatter = logging.Formatter("[%(asctime)s.%(msecs)d] "
"%(levelname)s: %(filename)s(%(lineno)d): %(message)s", "%Y-%m-%d %H:%M:%S")
dbg_formatter = logging.Formatter("%(asctime)-15s\t%(name)s:%(lineno)-4d\t\t%(levelname)-8s %(message)s")
info_formatter = logging.Formatter("[%(asctime)s] %(message)s", "%H:%M:%S")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(info_formatter)
handler.setLevel(logging.INFO)
log.addHandler(handler)
def gethash(fpath, hash_factory=blake2s, chunk_num_blocks=128):
h = hash_factory()
fsize = 0
UNCPathPrefix = "\\\\?\\" # без \\?\ будет ошибка при длинном пути
try:
with open(UNCPathPrefix+fpath,'rb') as f:
while chunk := f.read(chunk_num_blocks*h.block_size):
h.update(chunk)
fsize = f.tell()
except FileNotFoundError as err:
log.error("{0}".format(err))
return h.digest(), fsize
def formatsize(b):
"""Вывод размера файла по стандарту IEEE 1541/IEC 60027-2"""
assert isinstance(b, int)
kib = 1024
mib = kib**2
gib = mib*kib
tib = gib*kib
if b < kib:
return '%i bytes' % b
elif kib <= b < mib:
return '%.2f KiB' % float(b/kib)
elif mib <= b < gib:
return '%.2f MiB' % float(b/mib)
elif gib <= b < tib:
return '%.2f GiB' % float(b/gib)
elif tib <= b:
return '%.2f TiB' % float(b/tib)
def main(args):
hashcloud = set() # Set of all BLAKE2s hashes
origins = dict() # Dict of {blake2s: full-path}
bytes_removed = 0
bytes_processed = 0
for root, dirs, files in walk(args.path, True, None, args.followlinks):
for fname in files:
fullpath = path.join(root, fname)
if path.islink(fullpath):
continue
fhash, fsize = gethash(fullpath)
if fsize == 0:
continue
bytes_processed += fsize
if fhash in hashcloud:
if not args.simulate:
try:
remove(fullpath)
try:
symlink(origins[fhash], fullpath)
except Exception as err:
log.error("{0}".format(err))
except FileNotFoundError as err:
log.error("{0}".format(err))
except PermissionError as err:
log.error("{0}".format(err))
bytes_removed += fsize
try:
log.info(fullpath + " <<===>> " + origins[fhash])
except UnicodeEncodeError:
try:
s = fullpath + " <<===>> " + origins[fhash]
s = s.encode().decode(sys.getdefaultencoding(), 'replace')
s = s.replace('\ufffd', '?')
log.warning(s)
except UnicodeEncodeError as err:
log.error("<FAIL_TO_DECODE> <<===>> <FAIL_TO_DECODE>")
log.error("{0}".format(err))
else:
hashcloud.add(fhash)
origins[fhash] = fullpath
if not args.recursive:
break
return bytes_processed, bytes_removed
if __name__ == "__main__":
import argparse
from time import time
from datetime import datetime
from sys import argv
p = argparse.ArgumentParser()
p.add_argument("path", help="Destination path (where to search duplicates)", type=str)
p.add_argument("-r", "--recursive", help="Walk subdirectories recursively", action="store_true")
p.add_argument("-s", "--simulate", help="Do NOT take real actions, only simulate", action="store_true")
p.add_argument("-fl", "--followlinks", help="Follow links", action="store_true")
p.add_argument("--logfile", help="Log file name", type=str)
_args = p.parse_args()
if _args.logfile:
_args.logfile = datetime.now().strftime(_args.logfile)
handler = logging.FileHandler(_args.logfile, 'a', 'utf-8')
handler.setFormatter(dbg_formatter)
handler.setLevel(logging.DEBUG)
log.addHandler(handler)
log.debug("\n\n{0}\n+{1}+\n{0}".format("=" * 80, ("%s STARTED" % __file__).center(78)))
log.debug("Program args: " + ' '.join(sys.argv[1:]))
if not path.exists(_args.path):
log.error("Destination doesn't exists!")
exit(1)
start_time = time()
processed, removed = main(_args)
end_time = time()
dTime = (end_time - start_time)
sTime = "%.2f seconds" % dTime
if dTime > 60:
sTime = "%.2f minutes" % (dTime / 60)
log.info("After %s: %s processed, %s replaced by symlinks" % (sTime, formatsize(processed), formatsize(removed)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment