Skip to content

Instantly share code, notes, and snippets.

@dsoprea
Last active September 7, 2024 08:28
Show Gist options
  • Save dsoprea/d2e7326de7d11fb55c0054867674e863 to your computer and use it in GitHub Desktop.
Save dsoprea/d2e7326de7d11fb55c0054867674e863 to your computer and use it in GitHub Desktop.
Find all ZIPs in the source path, extract all files, and rename base portion of the output name to be the SHA1 digest of the data of that file.
#!/usr/bin/env python3
# Requirements: tqdm
import sys
import os
import argparse
import logging
import zipfile
import tempfile
import contextlib
import shutil
import hashlib
import tqdm
_DESCRIPTION = \
"Find all ZIPs in the source path, extract all files, and rename base " \
"portion of the output name to be the SHA1 digest of the data of that " \
"file. Useful for establishing a flat directory with a large number of " \
"images and ensuring uniqueness between images, not precluding the " \
"ability to update the images later while knowing which you already " \
"have, and skipping those that are already present."
_LOGGER = logging.getLogger(__name__)
_ALGORITHM = hashlib.sha1
def _get_args():
parser = \
argparse.ArgumentParser(
description=_DESCRIPTION)
parser.add_argument(
'source_path',
help="Source path")
parser.add_argument(
'target_path',
help="Target path")
args = parser.parse_args()
return args
def _print(message):
print(message, file=sys.stderr)
def _find_archives_gen(path):
entries = os.walk(path)
archive_filepaths = []
for rel_path, folders, files in entries:
# Process alphabetically for intuitiveness
folders.sort()
files.sort()
for filename in files:
if filename.lower().endswith('.zip') is False:
continue
yield os.path.join(path, rel_path, filename)
@contextlib.contextmanager
def _temp_path():
original_wd = os.getcwd()
path = tempfile.mkdtemp()
os.chdir(path)
try:
yield path
finally:
os.chdir(original_wd)
try:
shutil.rmtree(path)
except:
pass
def _main():
args = _get_args()
assert \
os.path.exists(args.source_path) is True, \
"Source path does not exist: [{}]".format(args.source_path)
assert \
os.path.exists(args.target_path) is True, \
"Target path does not exist: [{}]".format(args.target_path)
# We're gonna be changing into a temp path, so canonicalize the paths we
# were given
source_path = os.path.abspath(args.source_path)
target_path = os.path.abspath(args.target_path)
archive_filepaths = _find_archives_gen(source_path)
archive_filepaths = list(archive_filepaths)
t = tqdm.tqdm(total=0)
for archive_filepath in archive_filepaths:
with zipfile.ZipFile(archive_filepath) as z:
# We establish the temp-path here so that we don't interfere with
# the potentially relative source-path
with _temp_path() as temp_path:
entries = z.infolist()
t.total += len(entries)
t.refresh()
archive_basename = os.path.basename(archive_filepath)
for entry in entries:
# Set description into progress bar
entry_basename = entry.filename
description = \
'{}: {}'.format(
archive_basename,
entry_basename)
t.set_description(description)
# Extract
extracted_filepath = z.extract(entry)
# Calculate the digest
with open(extracted_filepath, 'rb') as f:
data = f.read()
h = _ALGORITHM(data)
digest = h.hexdigest()
# Determine if it already exists
entry_name, suffix = os.path.splitext(entry.filename)
final_filename = digest + suffix
final_filepath = \
os.path.join(
target_path,
final_filename)
if os.path.exists(final_filepath) is True:
continue
# Move to a temporary file in the target path
temp_filename = '.{}.temp'.format(final_filename)
temp_filepath = \
os.path.join(
target_path,
temp_filename)
shutil.copyfile(extracted_filepath, temp_filepath)
# Rename to final file-path
os.rename(temp_filepath, final_filepath)
t.update(1)
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment