Skip to content

Instantly share code, notes, and snippets.

@internetimagery
Created January 12, 2016 12:54
Show Gist options
  • Save internetimagery/dfe85caccb93021195b3 to your computer and use it in GitHub Desktop.
Save internetimagery/dfe85caccb93021195b3 to your computer and use it in GitHub Desktop.
Move duplicate files into a folder
# put in shebang here
from __future__ import print_function, division
"""
Take all duplicate files from decending directories and move
them into a new folder "Duplicates" in the working directory.
"""
import os
import sys
import time
import shutil
import os.path
import hashlib
import functools
def get_files(root, ignore=[]):
""" Grab all files from a directory and children """
for cwd, _, files in os.walk(root):
if cwd not in ignore:
for f in files:
yield os.path.join(cwd, f)
def id_file(path):
""" Provide a unique ID for a file """
size = 0
sha = hashlib.sha256()
with open(path, "r") as f:
while True:
buff = f.read(8192)
if not buff: break
sha.update(buff)
size += len(buff)
return sha.digest(), size
def progress_bar(width, total=100):
step = width / total
last_done = 0
start = time.time()
try:
while True:
progress = yield
done = int(progress * step)
if last_done < done:
last_done = done
remaining = width - done
bar = "[%s]" % (":"*done + "."*remaining)
sys.stdout.write(bar)
sys.stdout.write("\b"*len(bar))
sys.stdout.flush()
except GeneratorExit:
print("\n")
def get_duplicates(root, ignore=[]):
""" Get all duplicates from directories """
seen = set()
dispatch = itertools.tee(get_files(root, ignore))
for cwd, _, files in os.walk(root):
if cwd not in ignore:
for f in files:
path = os.path.join(cwd, f)
size = 0
sha = hashlib.sha256()
with open(path, "rb") as o:
while True:
buff = o.read(8192)
if not buff: break
sha.update(buff)
size += len(buff)
id_ = sha.digest(), size # unique ID for files
if id_ in seen:
yield path # Duplicate
else:
seen.add(id_)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description="Pull out all duplicate files from decendig directories.")
parser.add_argument("-r", "--root", help="Run from chosen directory instead of current one.")
parser.add_argument("-l", "--list", help="List duplicate files only.", action="store_true")
parser.add_argument("-i", "--ignore", help="Comma separated list of paths to ignore.")
args = parser.parse_args()
cwd = os.getcwd()
if args.root: # Requesting a folder
cwd = os.path.realpath(os.path.join(cwd, args.root))
if not os.path.isdir(cwd):
raise IOError("Root doesn't exist.")
folder = os.path.join(cwd, "Duplicates")
folder_made = False # Delay Creation of folder
ignore_paths = (os.path.realpath(os.path.join(cwd, a)) for a in (args.ignore or "").split(",") if a)
ignore_filtered = list(a for a in ignore_paths if os.path.isdir(a))
ignore_filtered.append(folder) # Ignore our duplicate folder
files = functools.partial(get_files, cwd, ignore_filtered) # Generator
file_count = 0
for f in files(): file_count += 1
if file_count:
seen = set() # Track what we have seen
progress = progress_bar(50, file_count)
next(progress) # Initialize
for prog, path in enumerate(files()):
progress.send(prog) # Update progress bar
id_ = id_file(path)
if id_ not in seen:
seen.add(id_)
else: # Found duplicate
if args.list:
print("Duplicate:", path)
else:
if not folder_made:
folder_made = True
if not os.path.isdir(folder): # Folder doesn't exist? Make it
os.mkdir(folder)
dest = os.path.join(folder, os.path.basename(path))
while os.path.isfile(dest):
parts = os.path.splitext(dest)
dest = "(copy)".join(parts)
shutil.move(path, dest)
print("Moved:", path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment