Skip to content

Instantly share code, notes, and snippets.

@mvexel
Last active September 2, 2025 00:06
Show Gist options
  • Select an option

  • Save mvexel/2cb64c84c3323375d4e460b4adca798c to your computer and use it in GitHub Desktop.

Select an option

Save mvexel/2cb64c84c3323375d4e460b4adca798c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Photo sorting script that organizes photos by camera/model and then by year/month
based on EXIF data.
"""
import os
import shutil
import argparse
import logging
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Tuple, Dict, Any, List
from PIL import Image
from hachoir.parser import createParser # type: ignore
from hachoir.metadata import extractMetadata # type: ignore
from tqdm import tqdm # type: ignore
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
# File extensions
PHOTO_EXTENSIONS = {
".jpg",
".jpeg",
".png",
".tiff",
".tif",
".raw",
".cr2",
".nef",
".arw",
".dng",
".heic",
".webp",
}
VIDEO_EXTENSIONS = {
".mp4",
".mov",
".avi",
".mkv",
".wmv",
".flv",
".webm",
".m4v",
".3gp",
".mts",
".m2ts",
}
def get_dng_dimensions_exifread(image_path: Path) -> Optional[Tuple[int, int]]:
"""Try to get DNG dimensions using ExifRead library."""
try:
import exifread # type: ignore
with open(image_path, "rb") as f:
tags = exifread.process_file(f, details=False)
width_tag = tags.get("EXIF ExifImageWidth") or tags.get("Image ImageWidth")
height_tag = tags.get("EXIF ExifImageLength") or tags.get(
"Image ImageLength"
)
if width_tag and height_tag:
return int(str(width_tag.values[0])), int(str(height_tag.values[0]))
except (ImportError, Exception) as e:
logging.debug(
f"Could not get DNG dimensions with exifread for {image_path}: {e}"
)
return None
def get_raw_dimensions(image_path: Path) -> Optional[Tuple[int, int]]:
"""Try to get raw image dimensions using rawpy."""
try:
import rawpy # type: ignore
with rawpy.imread(str(image_path)) as raw:
return raw.sizes.raw_width, raw.sizes.raw_height
except (ImportError, Exception) as e:
logging.debug(f"Could not get RAW dimensions with rawpy for {image_path}: {e}")
return None
def get_exif_data(
image_path: Path,
) -> Optional[Tuple[str, datetime, Optional[Tuple[int, int]]]]:
"""Extract EXIF data from an image file."""
try:
if image_path.suffix.lower() == ".heic":
try:
import pillow_heif # type: ignore
pillow_heif.register_heif_opener()
except ImportError:
date_taken = datetime.fromtimestamp(image_path.stat().st_mtime)
return "HEIC_Camera", date_taken, None
with Image.open(image_path) as img:
exif = img.getexif()
size: Optional[Tuple[int, int]] = img.size
if image_path.suffix.lower() == ".dng":
width, height = None, None
dng_size = get_dng_dimensions_exifread(image_path)
if dng_size:
width, height = dng_size
if not (width and height):
raw_size = get_raw_dimensions(image_path)
if raw_size:
width, height = raw_size
if width and height:
size = (width, height)
else:
# Assume large to avoid being categorized as small
size = (9999, 9999)
make = exif.get(271)
model = exif.get(272)
camera_info = f"{make or 'Unknown'}_{model or 'Unknown'}".replace(" ", "_")
date_str = exif.get(36867) or exif.get(306) # DateTimeOriginal or DateTime
date_taken = (
datetime.strptime(date_str, "%Y:%m:%d %H:%M:%S") if date_str else None
)
if not date_taken:
date_taken = datetime.fromtimestamp(image_path.stat().st_mtime)
return camera_info, date_taken, size
except Exception as e:
logging.warning(f"Could not get EXIF data for {image_path}: {e}")
try:
date_taken = datetime.fromtimestamp(image_path.stat().st_mtime)
return "Unknown_Unknown", date_taken, None
except Exception as stat_e:
logging.error(f"Could not even stat file {image_path}: {stat_e}")
return None
def get_video_metadata(
video_path: Path,
) -> Optional[Tuple[str, datetime, None]]:
"""Extract metadata from a video file."""
try:
parser = createParser(str(video_path))
if not parser:
return "Video", datetime.fromtimestamp(video_path.stat().st_mtime), None
with parser:
metadata = extractMetadata(parser)
date_taken = metadata.get("creation_date") if metadata else None
if not date_taken:
date_taken = datetime.fromtimestamp(video_path.stat().st_mtime)
return "Video", date_taken, None
except Exception as e:
logging.warning(f"Could not get video metadata for {video_path}: {e}")
try:
return "Video", datetime.fromtimestamp(video_path.stat().st_mtime), None
except Exception as stat_e:
logging.error(f"Could not stat video file {video_path}: {stat_e}")
return None
def sanitize_filename(name: str) -> str:
"""Remove or replace invalid characters in filenames."""
return "".join(c for c in name if c.isalnum() or c in "._-").strip()
def process_file(
file_path: Path, source_dir: Path, dest_path: Path, small_threshold: int
) -> Dict[str, Any]:
"""Processes a single file, returning its classification and target path."""
file_info: Dict[str, Any] = {
"source": file_path,
"target_dir": None,
"status": "skipped",
}
suffix = file_path.suffix.lower()
if not os.access(file_path, os.R_OK):
file_info["target_dir"] = dest_path / "_unreadable_files"
file_info["status"] = "unreadable"
return file_info
if suffix in PHOTO_EXTENSIONS:
exif_data = get_exif_data(file_path)
if not exif_data:
file_info["status"] = "no_exif"
return file_info
camera_info, date_taken, size = exif_data
if not (camera_info and date_taken):
file_info["status"] = "no_exif"
return file_info
if size and max(size) < small_threshold and size != (9999, 9999):
file_info["target_dir"] = dest_path / "_small_photos"
file_info["status"] = "small_photo"
else:
year_month = date_taken.strftime("%Y/%m")
file_info["target_dir"] = (
dest_path / sanitize_filename(camera_info) / year_month
)
file_info["status"] = "photo"
elif suffix in VIDEO_EXTENSIONS:
video_meta = get_video_metadata(file_path)
if not video_meta:
file_info["status"] = "no_date"
return file_info
_, date_taken, _ = video_meta
if not date_taken:
file_info["status"] = "no_date"
return file_info
year_month = date_taken.strftime("%Y/%m")
file_info["target_dir"] = dest_path / "_videos" / year_month
file_info["status"] = "video"
else:
file_info["target_dir"] = dest_path / "_unsupported_files"
file_info["status"] = "unsupported"
return file_info
def move_file(file_info: Dict[str, Any], dry_run: bool = False) -> str:
"""Moves a file according to the processed information."""
source_path = file_info["source"]
target_dir = file_info["target_dir"]
if not target_dir:
logging.debug(f"Skipping {source_path.name} ({file_info['status']})")
return "skipped"
if source_path.parent == target_dir:
logging.info(f"Already sorted: {source_path.name}")
return "skipped"
target_file = target_dir / sanitize_filename(source_path.name)
if dry_run:
logging.info(f"DRY RUN: Would move {source_path.name} to {target_dir}")
return str(file_info["status"])
try:
target_dir.mkdir(parents=True, exist_ok=True)
counter = 1
stem, suffix = source_path.stem, source_path.suffix
while target_file.exists():
target_file = target_dir / f"{stem}_{counter}{suffix}"
counter += 1
shutil.move(str(source_path), str(target_file))
logging.info(f"Moved {source_path.name} to {target_dir}")
return str(file_info["status"])
except (OSError, PermissionError) as e:
logging.error(f"Failed to move {source_path.name}: {e}")
return "error"
def cleanup_empty_directories(root_path: Path) -> None:
"""Remove empty directories recursively."""
for dirpath, _, _ in os.walk(root_path, topdown=False):
if Path(dirpath) == root_path:
continue
try:
os.rmdir(dirpath)
logging.info(f"Removed empty directory: {dirpath}")
except OSError:
pass # Directory not empty or other error
def sort_photos(
source_dir: str,
destination_dir: Optional[str] = None,
dry_run: bool = False,
in_place: bool = False,
small_threshold: int = 640,
) -> None:
"""Sort photos and videos by camera/model and date."""
source_path = Path(source_dir).resolve()
dest_path = (
Path(destination_dir).resolve() if destination_dir else (source_path / "sorted")
)
if in_place:
dest_path = source_path
if not source_path.is_dir():
logging.error(f"Source directory {source_dir} does not exist.")
return
all_files = [f for f in source_path.rglob("*") if f.is_file()]
processed_files: List[Dict[str, Any]] = []
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(process_file, fp, source_path, dest_path, small_threshold)
for fp in all_files
]
for future in tqdm(futures, total=len(all_files), desc="Analyzing files"):
processed_files.append(future.result())
counts = {
"photo": 0,
"video": 0,
"small_photo": 0,
"unreadable": 0,
"unsupported": 0,
"skipped": 0,
"error": 0,
}
for file_info in tqdm(processed_files, desc="Moving files"):
status = move_file(file_info, dry_run)
if status in counts:
counts[status] += 1
logging.info(f"\n--- Sorting Summary ---")
logging.info(f"Photos moved: {counts['photo']}")
logging.info(f"Videos moved: {counts['video']}")
logging.info(f"Small photos moved: {counts['small_photo']}")
logging.info(f"Unreadable files: {counts['unreadable']}")
logging.info(f"Unsupported files: {counts['unsupported']}")
logging.info(f"Skipped files: {counts['skipped']}")
logging.info(f"Errors: {counts['error']}")
logging.info("----------------------")
if not dry_run:
logging.info("Cleaning up empty directories...")
cleanup_empty_directories(source_path)
def main() -> None:
"""Main function to parse arguments and run the script."""
parser = argparse.ArgumentParser(
description="Sort photos and videos by camera/model and date from EXIF data.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("source", help="Source directory containing media files.")
parser.add_argument(
"-d",
"--destination",
help="Destination directory. Default: 'sorted' subdirectory in source.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Simulate the sort without moving any files.",
)
parser.add_argument(
"--in-place",
action="store_true",
help="Sort files within the source directory. Overrides --destination.",
)
parser.add_argument(
"--small-threshold",
type=int,
default=640,
help="Max dimension for a photo to be considered 'small'.",
)
args = parser.parse_args()
if args.in_place and args.destination:
parser.error("argument --destination: not allowed with argument --in-place")
sort_photos(
args.source, args.destination, args.dry_run, args.in_place, args.small_threshold
)
if __name__ == "__main__":
main()
@mvexel
Copy link
Author

mvexel commented Aug 28, 2025

Claude made this for me. It worked well on a directory with 50k photos and random other files.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment