Last active
September 2, 2025 00:06
-
-
Save mvexel/2cb64c84c3323375d4e460b4adca798c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Photo sorting script that organizes photos by camera/model and then by year/month | |
| based on EXIF data. | |
| """ | |
| import os | |
| import shutil | |
| import argparse | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| from concurrent.futures import ThreadPoolExecutor | |
| from typing import Optional, Tuple, Dict, Any, List | |
| from PIL import Image | |
| from hachoir.parser import createParser # type: ignore | |
| from hachoir.metadata import extractMetadata # type: ignore | |
| from tqdm import tqdm # type: ignore | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| # File extensions | |
| PHOTO_EXTENSIONS = { | |
| ".jpg", | |
| ".jpeg", | |
| ".png", | |
| ".tiff", | |
| ".tif", | |
| ".raw", | |
| ".cr2", | |
| ".nef", | |
| ".arw", | |
| ".dng", | |
| ".heic", | |
| ".webp", | |
| } | |
| VIDEO_EXTENSIONS = { | |
| ".mp4", | |
| ".mov", | |
| ".avi", | |
| ".mkv", | |
| ".wmv", | |
| ".flv", | |
| ".webm", | |
| ".m4v", | |
| ".3gp", | |
| ".mts", | |
| ".m2ts", | |
| } | |
| def get_dng_dimensions_exifread(image_path: Path) -> Optional[Tuple[int, int]]: | |
| """Try to get DNG dimensions using ExifRead library.""" | |
| try: | |
| import exifread # type: ignore | |
| with open(image_path, "rb") as f: | |
| tags = exifread.process_file(f, details=False) | |
| width_tag = tags.get("EXIF ExifImageWidth") or tags.get("Image ImageWidth") | |
| height_tag = tags.get("EXIF ExifImageLength") or tags.get( | |
| "Image ImageLength" | |
| ) | |
| if width_tag and height_tag: | |
| return int(str(width_tag.values[0])), int(str(height_tag.values[0])) | |
| except (ImportError, Exception) as e: | |
| logging.debug( | |
| f"Could not get DNG dimensions with exifread for {image_path}: {e}" | |
| ) | |
| return None | |
| def get_raw_dimensions(image_path: Path) -> Optional[Tuple[int, int]]: | |
| """Try to get raw image dimensions using rawpy.""" | |
| try: | |
| import rawpy # type: ignore | |
| with rawpy.imread(str(image_path)) as raw: | |
| return raw.sizes.raw_width, raw.sizes.raw_height | |
| except (ImportError, Exception) as e: | |
| logging.debug(f"Could not get RAW dimensions with rawpy for {image_path}: {e}") | |
| return None | |
| def get_exif_data( | |
| image_path: Path, | |
| ) -> Optional[Tuple[str, datetime, Optional[Tuple[int, int]]]]: | |
| """Extract EXIF data from an image file.""" | |
| try: | |
| if image_path.suffix.lower() == ".heic": | |
| try: | |
| import pillow_heif # type: ignore | |
| pillow_heif.register_heif_opener() | |
| except ImportError: | |
| date_taken = datetime.fromtimestamp(image_path.stat().st_mtime) | |
| return "HEIC_Camera", date_taken, None | |
| with Image.open(image_path) as img: | |
| exif = img.getexif() | |
| size: Optional[Tuple[int, int]] = img.size | |
| if image_path.suffix.lower() == ".dng": | |
| width, height = None, None | |
| dng_size = get_dng_dimensions_exifread(image_path) | |
| if dng_size: | |
| width, height = dng_size | |
| if not (width and height): | |
| raw_size = get_raw_dimensions(image_path) | |
| if raw_size: | |
| width, height = raw_size | |
| if width and height: | |
| size = (width, height) | |
| else: | |
| # Assume large to avoid being categorized as small | |
| size = (9999, 9999) | |
| make = exif.get(271) | |
| model = exif.get(272) | |
| camera_info = f"{make or 'Unknown'}_{model or 'Unknown'}".replace(" ", "_") | |
| date_str = exif.get(36867) or exif.get(306) # DateTimeOriginal or DateTime | |
| date_taken = ( | |
| datetime.strptime(date_str, "%Y:%m:%d %H:%M:%S") if date_str else None | |
| ) | |
| if not date_taken: | |
| date_taken = datetime.fromtimestamp(image_path.stat().st_mtime) | |
| return camera_info, date_taken, size | |
| except Exception as e: | |
| logging.warning(f"Could not get EXIF data for {image_path}: {e}") | |
| try: | |
| date_taken = datetime.fromtimestamp(image_path.stat().st_mtime) | |
| return "Unknown_Unknown", date_taken, None | |
| except Exception as stat_e: | |
| logging.error(f"Could not even stat file {image_path}: {stat_e}") | |
| return None | |
| def get_video_metadata( | |
| video_path: Path, | |
| ) -> Optional[Tuple[str, datetime, None]]: | |
| """Extract metadata from a video file.""" | |
| try: | |
| parser = createParser(str(video_path)) | |
| if not parser: | |
| return "Video", datetime.fromtimestamp(video_path.stat().st_mtime), None | |
| with parser: | |
| metadata = extractMetadata(parser) | |
| date_taken = metadata.get("creation_date") if metadata else None | |
| if not date_taken: | |
| date_taken = datetime.fromtimestamp(video_path.stat().st_mtime) | |
| return "Video", date_taken, None | |
| except Exception as e: | |
| logging.warning(f"Could not get video metadata for {video_path}: {e}") | |
| try: | |
| return "Video", datetime.fromtimestamp(video_path.stat().st_mtime), None | |
| except Exception as stat_e: | |
| logging.error(f"Could not stat video file {video_path}: {stat_e}") | |
| return None | |
| def sanitize_filename(name: str) -> str: | |
| """Remove or replace invalid characters in filenames.""" | |
| return "".join(c for c in name if c.isalnum() or c in "._-").strip() | |
| def process_file( | |
| file_path: Path, source_dir: Path, dest_path: Path, small_threshold: int | |
| ) -> Dict[str, Any]: | |
| """Processes a single file, returning its classification and target path.""" | |
| file_info: Dict[str, Any] = { | |
| "source": file_path, | |
| "target_dir": None, | |
| "status": "skipped", | |
| } | |
| suffix = file_path.suffix.lower() | |
| if not os.access(file_path, os.R_OK): | |
| file_info["target_dir"] = dest_path / "_unreadable_files" | |
| file_info["status"] = "unreadable" | |
| return file_info | |
| if suffix in PHOTO_EXTENSIONS: | |
| exif_data = get_exif_data(file_path) | |
| if not exif_data: | |
| file_info["status"] = "no_exif" | |
| return file_info | |
| camera_info, date_taken, size = exif_data | |
| if not (camera_info and date_taken): | |
| file_info["status"] = "no_exif" | |
| return file_info | |
| if size and max(size) < small_threshold and size != (9999, 9999): | |
| file_info["target_dir"] = dest_path / "_small_photos" | |
| file_info["status"] = "small_photo" | |
| else: | |
| year_month = date_taken.strftime("%Y/%m") | |
| file_info["target_dir"] = ( | |
| dest_path / sanitize_filename(camera_info) / year_month | |
| ) | |
| file_info["status"] = "photo" | |
| elif suffix in VIDEO_EXTENSIONS: | |
| video_meta = get_video_metadata(file_path) | |
| if not video_meta: | |
| file_info["status"] = "no_date" | |
| return file_info | |
| _, date_taken, _ = video_meta | |
| if not date_taken: | |
| file_info["status"] = "no_date" | |
| return file_info | |
| year_month = date_taken.strftime("%Y/%m") | |
| file_info["target_dir"] = dest_path / "_videos" / year_month | |
| file_info["status"] = "video" | |
| else: | |
| file_info["target_dir"] = dest_path / "_unsupported_files" | |
| file_info["status"] = "unsupported" | |
| return file_info | |
| def move_file(file_info: Dict[str, Any], dry_run: bool = False) -> str: | |
| """Moves a file according to the processed information.""" | |
| source_path = file_info["source"] | |
| target_dir = file_info["target_dir"] | |
| if not target_dir: | |
| logging.debug(f"Skipping {source_path.name} ({file_info['status']})") | |
| return "skipped" | |
| if source_path.parent == target_dir: | |
| logging.info(f"Already sorted: {source_path.name}") | |
| return "skipped" | |
| target_file = target_dir / sanitize_filename(source_path.name) | |
| if dry_run: | |
| logging.info(f"DRY RUN: Would move {source_path.name} to {target_dir}") | |
| return str(file_info["status"]) | |
| try: | |
| target_dir.mkdir(parents=True, exist_ok=True) | |
| counter = 1 | |
| stem, suffix = source_path.stem, source_path.suffix | |
| while target_file.exists(): | |
| target_file = target_dir / f"{stem}_{counter}{suffix}" | |
| counter += 1 | |
| shutil.move(str(source_path), str(target_file)) | |
| logging.info(f"Moved {source_path.name} to {target_dir}") | |
| return str(file_info["status"]) | |
| except (OSError, PermissionError) as e: | |
| logging.error(f"Failed to move {source_path.name}: {e}") | |
| return "error" | |
| def cleanup_empty_directories(root_path: Path) -> None: | |
| """Remove empty directories recursively.""" | |
| for dirpath, _, _ in os.walk(root_path, topdown=False): | |
| if Path(dirpath) == root_path: | |
| continue | |
| try: | |
| os.rmdir(dirpath) | |
| logging.info(f"Removed empty directory: {dirpath}") | |
| except OSError: | |
| pass # Directory not empty or other error | |
| def sort_photos( | |
| source_dir: str, | |
| destination_dir: Optional[str] = None, | |
| dry_run: bool = False, | |
| in_place: bool = False, | |
| small_threshold: int = 640, | |
| ) -> None: | |
| """Sort photos and videos by camera/model and date.""" | |
| source_path = Path(source_dir).resolve() | |
| dest_path = ( | |
| Path(destination_dir).resolve() if destination_dir else (source_path / "sorted") | |
| ) | |
| if in_place: | |
| dest_path = source_path | |
| if not source_path.is_dir(): | |
| logging.error(f"Source directory {source_dir} does not exist.") | |
| return | |
| all_files = [f for f in source_path.rglob("*") if f.is_file()] | |
| processed_files: List[Dict[str, Any]] = [] | |
| with ThreadPoolExecutor() as executor: | |
| futures = [ | |
| executor.submit(process_file, fp, source_path, dest_path, small_threshold) | |
| for fp in all_files | |
| ] | |
| for future in tqdm(futures, total=len(all_files), desc="Analyzing files"): | |
| processed_files.append(future.result()) | |
| counts = { | |
| "photo": 0, | |
| "video": 0, | |
| "small_photo": 0, | |
| "unreadable": 0, | |
| "unsupported": 0, | |
| "skipped": 0, | |
| "error": 0, | |
| } | |
| for file_info in tqdm(processed_files, desc="Moving files"): | |
| status = move_file(file_info, dry_run) | |
| if status in counts: | |
| counts[status] += 1 | |
| logging.info(f"\n--- Sorting Summary ---") | |
| logging.info(f"Photos moved: {counts['photo']}") | |
| logging.info(f"Videos moved: {counts['video']}") | |
| logging.info(f"Small photos moved: {counts['small_photo']}") | |
| logging.info(f"Unreadable files: {counts['unreadable']}") | |
| logging.info(f"Unsupported files: {counts['unsupported']}") | |
| logging.info(f"Skipped files: {counts['skipped']}") | |
| logging.info(f"Errors: {counts['error']}") | |
| logging.info("----------------------") | |
| if not dry_run: | |
| logging.info("Cleaning up empty directories...") | |
| cleanup_empty_directories(source_path) | |
| def main() -> None: | |
| """Main function to parse arguments and run the script.""" | |
| parser = argparse.ArgumentParser( | |
| description="Sort photos and videos by camera/model and date from EXIF data.", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| ) | |
| parser.add_argument("source", help="Source directory containing media files.") | |
| parser.add_argument( | |
| "-d", | |
| "--destination", | |
| help="Destination directory. Default: 'sorted' subdirectory in source.", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Simulate the sort without moving any files.", | |
| ) | |
| parser.add_argument( | |
| "--in-place", | |
| action="store_true", | |
| help="Sort files within the source directory. Overrides --destination.", | |
| ) | |
| parser.add_argument( | |
| "--small-threshold", | |
| type=int, | |
| default=640, | |
| help="Max dimension for a photo to be considered 'small'.", | |
| ) | |
| args = parser.parse_args() | |
| if args.in_place and args.destination: | |
| parser.error("argument --destination: not allowed with argument --in-place") | |
| sort_photos( | |
| args.source, args.destination, args.dry_run, args.in_place, args.small_threshold | |
| ) | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Claude made this for me. It worked well on a directory with 50k photos and random other files.