Skip to content

Instantly share code, notes, and snippets.

@anandsaha
Created May 26, 2024 10:41
Show Gist options
  • Save anandsaha/98a7f50db3a7d4b6a1d3abf8b8600e4a to your computer and use it in GitHub Desktop.
Save anandsaha/98a7f50db3a7d4b6a1d3abf8b8600e4a to your computer and use it in GitHub Desktop.
import shutil
import time
from datetime import datetime
from exif import Image
from pathlib import Path
import re
import ffmpeg
import logging
from datetime import datetime, timedelta
def setup_logger(log_file):
logger = logging.getLogger('log_message')
logger.setLevel(logging.INFO)
# Create file handler which logs even debug messages
fh = logging.FileHandler(log_file)
fh.setLevel(logging.INFO)
# Create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# Create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# Add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
return logger
def log_message(message, log_file='logfile.log'):
print(message)
return
logger = setup_logger(log_file)
logger.info(message)
# Function to create a folder if it doesn't exist
def create_folder(path):
path.mkdir(parents=True, exist_ok=True)
# Path to your consolidated images folder
source_folder = Path(r"E:\Staging\Google Takeout\2")
# Path to your organized folder
destination_folder = Path(r"E:\Staging\Google Takeout_arranged")
# List of video file extensions
video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.m4v'}
# Function to extract date from filename
def extract_date_from_filename(filename):
match = re.match(r"IMG_(\d{4})(\d{2})(\d{2})", filename)
if match:
year, month, day = match.groups()
return f"{year}:{month}:{day} 00:00:00"
match = re.match(r"VID_(\d{4})(\d{2})(\d{2})", filename)
if match:
year, month, day = match.groups()
return f"{year}:{month}:{day} 00:00:00"
return None
# Function to copy a file with retries
def copy_file_with_retries(src, dest, retries=5, delay=1):
for i in range(retries):
try:
shutil.copy2(str(src), str(dest))
return True
except PermissionError as e:
if e.winerror == 32:
print(f"Retrying {src} in {delay} seconds...")
time.sleep(delay)
else:
raise
log_message(f"Failed to copy {src} after {retries} attempts")
return False
# Function to get the creation date of a video file using ffmpeg
def get_video_creation_date(file_path):
try:
probe = ffmpeg.probe(str(file_path))
creation_time = next(
(stream['tags']['creation_time'] for stream in probe['streams'] if 'tags' in stream and 'creation_time' in stream['tags']), None)
if creation_time:
return datetime.strptime(creation_time, '%Y-%m-%dT%H:%M:%S.%fZ')
except Exception as e:
# log_message(f"Could not extract creation date for video {file_path.name}: {e}")
pass
return None
# Function to get the start of the week (Sunday) for a given date
def get_week_start(date_obj):
start_of_week = date_obj - timedelta(days=date_obj.weekday() + 1)
return start_of_week
# Loop through all files in the source folder and subfolders
for file_path in source_folder.rglob('*'):
if file_path.is_file():
file_extension = file_path.suffix.lower()
date_obj = None
if file_extension in video_extensions:
# Handle video files
date_obj = get_video_creation_date(file_path)
if not date_obj:
# Fall back to extracting date from filename
date_taken = extract_date_from_filename(file_path.stem)
if date_taken:
date_obj = datetime.strptime(date_taken, '%Y:%m:%d %H:%M:%S')
else:
# it's an image
file_size_bytes = file_path.stat().st_size
# Convert the file size to megabytes (MB)
file_size_mb = file_size_bytes / (1024 * 1024)
if file_size_mb > 20: # assuming no image file will be bigger than this MB
continue
with file_path.open('rb') as image_file:
try:
img = Image(image_file)
try:
if hasattr(img, 'datetime_original'):
date_taken = img.datetime_original
date_obj = datetime.strptime(date_taken, '%Y:%m:%d %H:%M:%S')
else:
date_taken = extract_date_from_filename(file_path.stem)
if date_taken:
date_obj = datetime.strptime(date_taken, '%Y:%m:%d %H:%M:%S')
except Exception as e:
log_message(f"Sending file to errorred folder: {file_path.name}: {e}")
folder_path = destination_folder / "errorred"
create_folder(folder_path)
copy_file_with_retries(file_path, folder_path / file_path.name)
continue
except Exception as e:
if "json" in file_path.name:
pass
else:
log_message(f"Could not process file {file_path.name}: {e}")
continue
if date_obj:
year = date_obj.strftime('%Y')
month = date_obj.strftime('%m-%b')
# day = date_obj.strftime('%d')
week_start = get_week_start(date_obj)
folder_name = f"Week starting {week_start.strftime('%d-%b-%Y')}"
folder_path = destination_folder / year / month / folder_name
create_folder(folder_path)
copy_file_with_retries(file_path, folder_path / file_path.name)
log_message(f"Copied file {file_path.name} to {folder_path}")
else:
unknown_folder_path = destination_folder / "Unknown_Date"
create_folder(unknown_folder_path)
copy_file_with_retries(file_path, unknown_folder_path / file_path.name)
log_message(f"Copied file {file_path.name} to Unknown_Date folder")
print("Organization complete!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment