Last active
September 7, 2024 08:28
-
-
Save dsoprea/d2e7326de7d11fb55c0054867674e863 to your computer and use it in GitHub Desktop.
Find all ZIPs in the source path, extract all files, and rename base portion of the output name to be the SHA1 digest of the data of that file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Requirements: tqdm | |
import sys | |
import os | |
import argparse | |
import logging | |
import zipfile | |
import tempfile | |
import contextlib | |
import shutil | |
import hashlib | |
import tqdm | |
_DESCRIPTION = \ | |
"Find all ZIPs in the source path, extract all files, and rename base " \ | |
"portion of the output name to be the SHA1 digest of the data of that " \ | |
"file. Useful for establishing a flat directory with a large number of " \ | |
"images and ensuring uniqueness between images, not precluding the " \ | |
"ability to update the images later while knowing which you already " \ | |
"have, and skipping those that are already present." | |
_LOGGER = logging.getLogger(__name__) | |
_ALGORITHM = hashlib.sha1 | |
def _get_args(): | |
parser = \ | |
argparse.ArgumentParser( | |
description=_DESCRIPTION) | |
parser.add_argument( | |
'source_path', | |
help="Source path") | |
parser.add_argument( | |
'target_path', | |
help="Target path") | |
args = parser.parse_args() | |
return args | |
def _print(message): | |
print(message, file=sys.stderr) | |
def _find_archives_gen(path): | |
entries = os.walk(path) | |
archive_filepaths = [] | |
for rel_path, folders, files in entries: | |
# Process alphabetically for intuitiveness | |
folders.sort() | |
files.sort() | |
for filename in files: | |
if filename.lower().endswith('.zip') is False: | |
continue | |
yield os.path.join(path, rel_path, filename) | |
@contextlib.contextmanager | |
def _temp_path(): | |
original_wd = os.getcwd() | |
path = tempfile.mkdtemp() | |
os.chdir(path) | |
try: | |
yield path | |
finally: | |
os.chdir(original_wd) | |
try: | |
shutil.rmtree(path) | |
except: | |
pass | |
def _main(): | |
args = _get_args() | |
assert \ | |
os.path.exists(args.source_path) is True, \ | |
"Source path does not exist: [{}]".format(args.source_path) | |
assert \ | |
os.path.exists(args.target_path) is True, \ | |
"Target path does not exist: [{}]".format(args.target_path) | |
# We're gonna be changing into a temp path, so canonicalize the paths we | |
# were given | |
source_path = os.path.abspath(args.source_path) | |
target_path = os.path.abspath(args.target_path) | |
archive_filepaths = _find_archives_gen(source_path) | |
archive_filepaths = list(archive_filepaths) | |
t = tqdm.tqdm(total=0) | |
for archive_filepath in archive_filepaths: | |
with zipfile.ZipFile(archive_filepath) as z: | |
# We establish the temp-path here so that we don't interfere with | |
# the potentially relative source-path | |
with _temp_path() as temp_path: | |
entries = z.infolist() | |
t.total += len(entries) | |
t.refresh() | |
archive_basename = os.path.basename(archive_filepath) | |
for entry in entries: | |
# Set description into progress bar | |
entry_basename = entry.filename | |
description = \ | |
'{}: {}'.format( | |
archive_basename, | |
entry_basename) | |
t.set_description(description) | |
# Extract | |
extracted_filepath = z.extract(entry) | |
# Calculate the digest | |
with open(extracted_filepath, 'rb') as f: | |
data = f.read() | |
h = _ALGORITHM(data) | |
digest = h.hexdigest() | |
# Determine if it already exists | |
entry_name, suffix = os.path.splitext(entry.filename) | |
final_filename = digest + suffix | |
final_filepath = \ | |
os.path.join( | |
target_path, | |
final_filename) | |
if os.path.exists(final_filepath) is True: | |
continue | |
# Move to a temporary file in the target path | |
temp_filename = '.{}.temp'.format(final_filename) | |
temp_filepath = \ | |
os.path.join( | |
target_path, | |
temp_filename) | |
shutil.copyfile(extracted_filepath, temp_filepath) | |
# Rename to final file-path | |
os.rename(temp_filepath, final_filepath) | |
t.update(1) | |
_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment