Created
March 28, 2022 14:26
-
-
Save trougnouf/fb0a939c78840a41821bd4764e5267e3 to your computer and use it in GitHub Desktop.
Find a random picture to develop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import yaml | |
import random | |
import zlib | |
import tqdm | |
RAW_EXT = ['RAF', 'NEF', 'ARW', 'dng', 'nef', 'CR2'] | |
RAW_WEIGHT = 4 | |
FLAT_EXT = ['JPG', 'jpg', 'JPEG', 'jpg', 'xcf', 'tif', 'png', 'gif'] | |
IGNORED_EXT = ['xmp', 'gpx', 'txt', 'py', 'zip', 'sh', 'gpxtracks', 'json', 'pdf', 'THM', 'LRV', 'tar.xz', 'tar.bz2', 'log', 'db', 'ini', 'xml', 'thm'] | |
JPG_WEIGHT = 1 | |
VIDEO_EXT = ['3gpp', 'MOV', 'mkv', 'MP4', 'mp4', 'webm', 'MPG', 'avi'] | |
VIDEO_WEIGHT = 1 | |
IGNORED_DIRS = ['darktable_exported', 'camera_exported'] | |
CACHE_DIR = '.cache' | |
CHECKSUMS_CACHE_FN = 'img_checksums.yaml' | |
PICS_DPATH = os.path.join(os.path.sep, 'orb', 'Pictures', 'ITookAPicture') | |
DONE_FN = '.DONE' | |
WARN_DUPLICATE = False | |
CHECKSUMS_CACHE_FPATH = os.path.join(CACHE_DIR, CHECKSUMS_CACHE_FN) | |
checksums = set() | |
def adler32(fpath, chunksize=65536): | |
"""Compute the CRC-32 checksum of the contents of the given filename""" | |
#breakpoint() | |
with open(fpath, "rb") as f: | |
checksum = 0 | |
while (chunk := f.read(chunksize)) : | |
checksum = zlib.adler32(chunk, checksum) | |
checksums_cache[fpath] = checksum | |
return checksum | |
def get_cache(cache_fpath: str) -> dict: | |
if os.path.isfile(cache_fpath): | |
with open(cache_fpath, 'r') as stream: | |
try: | |
return yaml.safe_load(stream) | |
except yaml.YAMLError as e: | |
print(e) | |
else: | |
print(f'{cache_fpath} not found') | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
return False | |
return {'dsum': 0, 'subdirs': dict(), 'path': PICS_DPATH} | |
def save_cache(cache_fpath: str, cache): | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
with open(cache_fpath, 'w') as stream: | |
yaml.dump(cache, stream) | |
def absolute_walk(root_dpath: str): | |
for dpath, _, filenames in os.walk(root_dpath): | |
if os.path.isfile(os.path.join(dpath, DONE_FN)) or any((ignored_dir in dpath for ignored_dir in IGNORED_DIRS)): | |
continue | |
for fn in filenames: | |
fpath = os.path.join(dpath, fn) | |
if os.path.isfile(fpath): | |
yield fpath | |
def img_weight(fpath: str) -> int: | |
if any([fpath.endswith(ext) for ext in RAW_EXT]):# and not checksum_exists(elpath, checksums, checksums_cache): | |
return RAW_WEIGHT | |
elif any([fpath.endswith(ext) for ext in FLAT_EXT]):# and not checksum_exists(elpath, checksums, checksums_cache): | |
return JPG_WEIGHT | |
elif any([fpath.endswith(ext) for ext in VIDEO_EXT]):# and not checksum_exists(elpath, checksums, checksums_cache): | |
return VIDEO_WEIGHT | |
else: | |
if not any([fpath.endswith(ext) for ext in IGNORED_EXT]): | |
print(f'img_weight: ignoring unknown extension in {fpath}') | |
return 0 | |
class Image(): | |
def __init__(self, fpath: str): | |
self.fpath = fpath | |
self.weight = img_weight(fpath) | |
def __repr__(self): | |
return self.fpath | |
def anal_dir(checksums_cache: dict, root_dpath = PICS_DPATH) -> tuple[list, int]: | |
checksums_set = set() | |
images = list() | |
images_sum = 0 | |
for fpath in tqdm.tqdm(absolute_walk(root_dpath)): | |
image = Image(fpath) | |
if image.weight == 0: | |
continue | |
checksum = checksums_cache.get(fpath) | |
if not checksum: | |
#breakpoint() | |
print(f'anal_dir: found new file {fpath}') | |
checksum = adler32(fpath) | |
checksums_cache[fpath] = checksum | |
#checksum = checksums_cache.setdefault(fpath, adler32(fpath)) | |
if checksum not in checksums_set: | |
checksums_set.add(checksum) | |
images.append(image) | |
images_sum += image.weight | |
else: | |
if WARN_DUPLICATE: | |
print(f'anal_dir: ignoring duplicate file {fpath}') | |
return images, images_sum | |
# actually not random since index is given | |
def find_random_image(images, images_sum): | |
img_id = random.randint(0, images_sum) | |
print(f'id {img_id}/{images_sum}') | |
cur_id = 0 | |
for image in tqdm.tqdm(images): | |
cur_id += image.weight | |
if cur_id >= img_id: | |
return image | |
if __name__ == '__main__': | |
checksums_cache = get_cache(cache_fpath=CHECKSUMS_CACHE_FPATH) | |
if not checksums_cache: | |
checksums_cache = dict() | |
images, images_sum = anal_dir(checksums_cache, PICS_DPATH) | |
image = find_random_image(images, images_sum) | |
print(image) | |
save_cache(cache_fpath=CHECKSUMS_CACHE_FPATH, cache=checksums_cache) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment