Created
May 26, 2024 10:41
-
-
Save anandsaha/98a7f50db3a7d4b6a1d3abf8b8600e4a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shutil | |
import time | |
from datetime import datetime | |
from exif import Image | |
from pathlib import Path | |
import re | |
import ffmpeg | |
import logging | |
from datetime import datetime, timedelta | |
def setup_logger(log_file): | |
logger = logging.getLogger('log_message') | |
logger.setLevel(logging.INFO) | |
# Create file handler which logs even debug messages | |
fh = logging.FileHandler(log_file) | |
fh.setLevel(logging.INFO) | |
# Create console handler with a higher log level | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.INFO) | |
# Create formatter and add it to the handlers | |
formatter = logging.Formatter('%(asctime)s - %(message)s') | |
fh.setFormatter(formatter) | |
ch.setFormatter(formatter) | |
# Add the handlers to the logger | |
logger.addHandler(fh) | |
logger.addHandler(ch) | |
return logger | |
def log_message(message, log_file='logfile.log'): | |
print(message) | |
return | |
logger = setup_logger(log_file) | |
logger.info(message) | |
# Function to create a folder if it doesn't exist | |
def create_folder(path): | |
path.mkdir(parents=True, exist_ok=True) | |
# Path to your consolidated images folder | |
source_folder = Path(r"E:\Staging\Google Takeout\2") | |
# Path to your organized folder | |
destination_folder = Path(r"E:\Staging\Google Takeout_arranged") | |
# List of video file extensions | |
video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.m4v'} | |
# Function to extract date from filename | |
def extract_date_from_filename(filename): | |
match = re.match(r"IMG_(\d{4})(\d{2})(\d{2})", filename) | |
if match: | |
year, month, day = match.groups() | |
return f"{year}:{month}:{day} 00:00:00" | |
match = re.match(r"VID_(\d{4})(\d{2})(\d{2})", filename) | |
if match: | |
year, month, day = match.groups() | |
return f"{year}:{month}:{day} 00:00:00" | |
return None | |
# Function to copy a file with retries | |
def copy_file_with_retries(src, dest, retries=5, delay=1): | |
for i in range(retries): | |
try: | |
shutil.copy2(str(src), str(dest)) | |
return True | |
except PermissionError as e: | |
if e.winerror == 32: | |
print(f"Retrying {src} in {delay} seconds...") | |
time.sleep(delay) | |
else: | |
raise | |
log_message(f"Failed to copy {src} after {retries} attempts") | |
return False | |
# Function to get the creation date of a video file using ffmpeg | |
def get_video_creation_date(file_path): | |
try: | |
probe = ffmpeg.probe(str(file_path)) | |
creation_time = next( | |
(stream['tags']['creation_time'] for stream in probe['streams'] if 'tags' in stream and 'creation_time' in stream['tags']), None) | |
if creation_time: | |
return datetime.strptime(creation_time, '%Y-%m-%dT%H:%M:%S.%fZ') | |
except Exception as e: | |
# log_message(f"Could not extract creation date for video {file_path.name}: {e}") | |
pass | |
return None | |
# Function to get the start of the week (Sunday) for a given date | |
def get_week_start(date_obj): | |
start_of_week = date_obj - timedelta(days=date_obj.weekday() + 1) | |
return start_of_week | |
# Loop through all files in the source folder and subfolders | |
for file_path in source_folder.rglob('*'): | |
if file_path.is_file(): | |
file_extension = file_path.suffix.lower() | |
date_obj = None | |
if file_extension in video_extensions: | |
# Handle video files | |
date_obj = get_video_creation_date(file_path) | |
if not date_obj: | |
# Fall back to extracting date from filename | |
date_taken = extract_date_from_filename(file_path.stem) | |
if date_taken: | |
date_obj = datetime.strptime(date_taken, '%Y:%m:%d %H:%M:%S') | |
else: | |
# it's an image | |
file_size_bytes = file_path.stat().st_size | |
# Convert the file size to megabytes (MB) | |
file_size_mb = file_size_bytes / (1024 * 1024) | |
if file_size_mb > 20: # assuming no image file will be bigger than this MB | |
continue | |
with file_path.open('rb') as image_file: | |
try: | |
img = Image(image_file) | |
try: | |
if hasattr(img, 'datetime_original'): | |
date_taken = img.datetime_original | |
date_obj = datetime.strptime(date_taken, '%Y:%m:%d %H:%M:%S') | |
else: | |
date_taken = extract_date_from_filename(file_path.stem) | |
if date_taken: | |
date_obj = datetime.strptime(date_taken, '%Y:%m:%d %H:%M:%S') | |
except Exception as e: | |
log_message(f"Sending file to errorred folder: {file_path.name}: {e}") | |
folder_path = destination_folder / "errorred" | |
create_folder(folder_path) | |
copy_file_with_retries(file_path, folder_path / file_path.name) | |
continue | |
except Exception as e: | |
if "json" in file_path.name: | |
pass | |
else: | |
log_message(f"Could not process file {file_path.name}: {e}") | |
continue | |
if date_obj: | |
year = date_obj.strftime('%Y') | |
month = date_obj.strftime('%m-%b') | |
# day = date_obj.strftime('%d') | |
week_start = get_week_start(date_obj) | |
folder_name = f"Week starting {week_start.strftime('%d-%b-%Y')}" | |
folder_path = destination_folder / year / month / folder_name | |
create_folder(folder_path) | |
copy_file_with_retries(file_path, folder_path / file_path.name) | |
log_message(f"Copied file {file_path.name} to {folder_path}") | |
else: | |
unknown_folder_path = destination_folder / "Unknown_Date" | |
create_folder(unknown_folder_path) | |
copy_file_with_retries(file_path, unknown_folder_path / file_path.name) | |
log_message(f"Copied file {file_path.name} to Unknown_Date folder") | |
print("Organization complete!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment