Created
January 12, 2016 12:54
-
-
Save internetimagery/dfe85caccb93021195b3 to your computer and use it in GitHub Desktop.
Move duplicate files into a folder
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# put in shebang here | |
from __future__ import print_function, division | |
""" | |
Take all duplicate files from decending directories and move | |
them into a new folder "Duplicates" in the working directory. | |
""" | |
import os | |
import sys | |
import time | |
import shutil | |
import os.path | |
import hashlib | |
import functools | |
def get_files(root, ignore=[]): | |
""" Grab all files from a directory and children """ | |
for cwd, _, files in os.walk(root): | |
if cwd not in ignore: | |
for f in files: | |
yield os.path.join(cwd, f) | |
def id_file(path): | |
""" Provide a unique ID for a file """ | |
size = 0 | |
sha = hashlib.sha256() | |
with open(path, "r") as f: | |
while True: | |
buff = f.read(8192) | |
if not buff: break | |
sha.update(buff) | |
size += len(buff) | |
return sha.digest(), size | |
def progress_bar(width, total=100): | |
step = width / total | |
last_done = 0 | |
start = time.time() | |
try: | |
while True: | |
progress = yield | |
done = int(progress * step) | |
if last_done < done: | |
last_done = done | |
remaining = width - done | |
bar = "[%s]" % (":"*done + "."*remaining) | |
sys.stdout.write(bar) | |
sys.stdout.write("\b"*len(bar)) | |
sys.stdout.flush() | |
except GeneratorExit: | |
print("\n") | |
def get_duplicates(root, ignore=[]): | |
""" Get all duplicates from directories """ | |
seen = set() | |
dispatch = itertools.tee(get_files(root, ignore)) | |
for cwd, _, files in os.walk(root): | |
if cwd not in ignore: | |
for f in files: | |
path = os.path.join(cwd, f) | |
size = 0 | |
sha = hashlib.sha256() | |
with open(path, "rb") as o: | |
while True: | |
buff = o.read(8192) | |
if not buff: break | |
sha.update(buff) | |
size += len(buff) | |
id_ = sha.digest(), size # unique ID for files | |
if id_ in seen: | |
yield path # Duplicate | |
else: | |
seen.add(id_) | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser(description="Pull out all duplicate files from decendig directories.") | |
parser.add_argument("-r", "--root", help="Run from chosen directory instead of current one.") | |
parser.add_argument("-l", "--list", help="List duplicate files only.", action="store_true") | |
parser.add_argument("-i", "--ignore", help="Comma separated list of paths to ignore.") | |
args = parser.parse_args() | |
cwd = os.getcwd() | |
if args.root: # Requesting a folder | |
cwd = os.path.realpath(os.path.join(cwd, args.root)) | |
if not os.path.isdir(cwd): | |
raise IOError("Root doesn't exist.") | |
folder = os.path.join(cwd, "Duplicates") | |
folder_made = False # Delay Creation of folder | |
ignore_paths = (os.path.realpath(os.path.join(cwd, a)) for a in (args.ignore or "").split(",") if a) | |
ignore_filtered = list(a for a in ignore_paths if os.path.isdir(a)) | |
ignore_filtered.append(folder) # Ignore our duplicate folder | |
files = functools.partial(get_files, cwd, ignore_filtered) # Generator | |
file_count = 0 | |
for f in files(): file_count += 1 | |
if file_count: | |
seen = set() # Track what we have seen | |
progress = progress_bar(50, file_count) | |
next(progress) # Initialize | |
for prog, path in enumerate(files()): | |
progress.send(prog) # Update progress bar | |
id_ = id_file(path) | |
if id_ not in seen: | |
seen.add(id_) | |
else: # Found duplicate | |
if args.list: | |
print("Duplicate:", path) | |
else: | |
if not folder_made: | |
folder_made = True | |
if not os.path.isdir(folder): # Folder doesn't exist? Make it | |
os.mkdir(folder) | |
dest = os.path.join(folder, os.path.basename(path)) | |
while os.path.isfile(dest): | |
parts = os.path.splitext(dest) | |
dest = "(copy)".join(parts) | |
shutil.move(path, dest) | |
print("Moved:", path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment