Skip to content

Instantly share code, notes, and snippets.

@soraxas
Last active October 18, 2023 06:53
Show Gist options
  • Save soraxas/f2c0cc5f2207997932afadad35971080 to your computer and use it in GitHub Desktop.
Save soraxas/f2c0cc5f2207997932afadad35971080 to your computer and use it in GitHub Desktop.
A script that rename recorded videos into friendly name
#!/usr/bin/env python3
import glob
import dataclasses
import subprocess
import os
import tqdm
from abc import ABC
from datetime import datetime
from pathlib import Path
from typing import List
from collections import defaultdict
# from joblib import Parallel, delayed
ROOT = Path(__file__).parent / "DCIM" / "100GOPRO"
GLOBAL_RENAME_REGISTRY = set()
@dataclasses.dataclass
class VideoPart:
def __init__(self, identity) -> None:
self.identity = identity
@property
def low_res(self) -> Path:
return ROOT / f"{self.identity}.LRV"
@property
def thumbnail(self) -> Path:
return ROOT / f"{self.identity}.THM"
@property
def video(self) -> Path:
return ROOT / f"{self.identity}.MP4"
def get_deleted(self) -> List[Path]:
return [p for p in (self.low_res, self.thumbnail, self.video) if not p.exists()]
def get_exists(self) -> List[Path]:
return [p for p in (self.low_res, self.thumbnail, self.video) if p.exists()]
collector = dict()
for fname in ROOT.glob("*.*"):
# collect and gather videos
if fname.suffix.upper() in (".MP4", ".THM", ".LRV"):
if fname.stem not in collector:
collector[fname.stem] = VideoPart(fname.stem)
for identity, videopart in collector.items():
deleted = videopart.get_deleted()
if len(deleted):
print(
f"Content {identity}: the parts for '{', '.join(d.suffix[1:] for d in deleted)}' seems to have deleted. Remaining:"
)
remaining_parts = videopart.get_exists()
for i, remaining in enumerate(remaining_parts):
print(f" {i+1}. {remaining.name}")
out = "lol"
while out not in ("", "y", "n"):
_range = "/".join(f"{i+1}" for i in range(len(remaining_parts)))
out = input(
f" > Delete the remaining parts? [y(es)/N(o) | view: {_range}] "
).lower()
try:
num = int(out)
except ValueError:
pass
else:
import subprocess
if 0 < num <= len(remaining_parts):
print(
subprocess.check_output(["xdg-open", remaining_parts[num - 1]])
)
if out == "y":
for remaining in videopart.get_exists():
print(f" > deleting {remaining.name}")
remaining.unlink()
#!/usr/bin/env python3
import glob
import dataclasses
import subprocess
import argparse
import os
import sys
import tqdm
from abc import ABC
from datetime import datetime
from pathlib import Path
from typing import List
from collections import defaultdict
# from joblib import Parallel, delayed
parser = argparse.ArgumentParser()
parser.add_argument(
"input_path",
help="input folder that contains images/videos",
)
parser.add_argument(
"--dry-run",
"-d",
action="store_true",
help="input folder that contains images/videos",
)
parser.add_argument(
"--dji",
action="store_true",
help="use dji action cam",
)
args = parser.parse_args()
input_path = Path(args.input_path)
if not input_path.exists():
print(f"Given input paht {input_path} does not exists")
ROOT = input_path
# ROOT = Path(__file__).parent / "DCIM" / "100GOPRO"
GLOBAL_RENAME_REGISTRY = set()
def get_creation_date(filename: str):
return (
subprocess.check_output(
[
"exiftool",
"-T",
"-api",
"LargeFileSupport=1", # enable flag for large files
"-CreateDate",
filename, # target file
]
)
.decode()
.strip()
)
def format_creation_date(filename: str) -> str:
create_date = get_creation_date(filename)
date_obj = datetime.strptime(create_date, "%Y:%m:%d %H:%M:%S")
return date_obj.strftime("%Y-%m-%d_%H%M%S")
class Renameable(ABC):
def __init__(self) -> None:
self.renames = []
@classmethod
def glob(cls) -> List[str]:
raise NotImplementedError()
def perform_rename(self):
for a, b in self.renames:
os.rename(a, b)
def _verify_rename(self, original, new):
print(f"{original} -> {new}")
if os.path.exists(new):
raise ValueError(f"Destination exists!")
if new in GLOBAL_RENAME_REGISTRY:
raise ValueError(f"Multiple files will be renamed to same destination!")
GLOBAL_RENAME_REGISTRY.add(new)
self.renames.append((original, new))
class RenameableRecordSession(Renameable):
def __init__(self) -> None:
super().__init__()
self.video_parts: List["VideoPart"] = []
@classmethod
def create_video_part(cls, dir: Path, basename: str):
raise NotImplementedError()
def add_part(self, part: "VideoPart"):
self.video_parts.append(part)
class Photo(Renameable):
def __init__(self, filepath: Path) -> None:
super().__init__()
target_name = (
filepath.parent / f"{format_creation_date(filepath)}{filepath.suffix}"
)
self._verify_rename(str(filepath), str(target_name))
###############################################################################
###############################################################################
class GoProPhoto(Photo):
@classmethod
def glob(cls) -> List[str]:
return ROOT.glob("GOPR*.JPG")
class GoProRecordSession(RenameableRecordSession):
@dataclasses.dataclass
class VideoPart:
low_res: Path
thumbnail: Path
video: Path
identity: str
def __init__(self, dir: Path, basename: str) -> None:
basename = basename[2:]
self.identity = basename
self.low_res = dir / f"GL{basename}.LRV"
self.thumbnail = dir / f"GX{basename}.THM"
self.video = dir / f"GX{basename}.MP4"
self.dir = dir
for path in [
self.low_res,
self.thumbnail,
self.video,
]:
if not path.exists():
raise ValueError(f"{path} does not exists")
@property
def session_name(self) -> str:
return self.identity[2:]
@property
def part_num(self) -> int:
return int(self.identity[:2])
def __lt__(self, other: "GoProRecordSession.VideoPart"):
return self.part_num < other.part_num
##################################################################
@classmethod
def glob(cls) -> List[str]:
return ROOT.glob("G[XL]*.[mM][pP]4")
@classmethod
def create_video_part(cls, dir: Path, basename: str):
return GoProRecordSession.VideoPart(dir, basename)
def ensure_all_parts_collected(self):
self.video_parts = sorted(self.video_parts)
if len(self.video_parts) != self.video_parts[-1].part_num:
raise ValueError(
f"Got latest part {self.video_parts[-1]},"
f"but was only able to collect {len(self.video_parts)} parts: {self.video_parts}"
)
def prepare_rename(self):
self.renames.clear()
for part in self.video_parts:
new_name = format_creation_date(str(self.video_parts[0].video))
if len(self.video_parts) > 1:
new_name += f".{part.part_num:02d}"
self._verify_rename(part.low_res, part.dir / f"{new_name}[Low-Res].mp4")
self._verify_rename(part.thumbnail, part.dir / f"{new_name}.THM")
self._verify_rename(part.video, part.dir / f"{new_name}.mp4")
class DjiPhoto(Photo):
@classmethod
def glob(cls) -> List[str]:
return ROOT.glob("DJI_*.JPG")
class DjiRecordSession(RenameableRecordSession):
@dataclasses.dataclass
class VideoPart:
low_res: Path
video: Path
identity: str
def __init__(self, dir: Path, basename: str) -> None:
basename = basename[4:]
self.identity = basename
self.low_res = dir / f"DJI_{basename}.LRF"
self.video = dir / f"DJI_{basename}.MP4"
self.dir = dir
for path in [
self.low_res,
self.video,
]:
if not path.exists():
raise ValueError(f"{path} does not exists")
@property
def session_name(self) -> str:
return self.identity
@property
def part_num(self) -> int:
# not used
return 1
# return int(self.identity[:4])
def __lt__(self, other: "DjiRecordSession.VideoPart"):
return self.part_num < other.part_num
##################################################################
@classmethod
def glob(cls) -> List[str]:
return ROOT.glob("DJI_*.[mM][pP]4")
@classmethod
def create_video_part(cls, dir: Path, basename: str):
return DjiRecordSession.VideoPart(dir, basename)
def ensure_all_parts_collected(self):
self.video_parts = sorted(self.video_parts)
if len(self.video_parts) != self.video_parts[-1].part_num:
raise ValueError(
f"Got latest part {self.video_parts[-1]},"
f"but was only able to collect {len(self.video_parts)} parts: {self.video_parts}"
)
def prepare_rename(self):
self.renames.clear()
for part in self.video_parts:
new_name = format_creation_date(str(self.video_parts[0].video))
if len(self.video_parts) > 1:
new_name += f".{part.part_num:02d}"
self._verify_rename(part.low_res, part.dir / f"{new_name}[Low-Res].mp4")
self._verify_rename(part.video, part.dir / f"{new_name}.mp4")
@dataclasses.dataclass
class ActionCamera:
photo_cls: Renameable
video_cls: RenameableRecordSession
GoProActionCamera = ActionCamera(GoProPhoto, GoProRecordSession)
DjiActionCamera = ActionCamera(DjiPhoto, DjiRecordSession)
###############################################################################
###############################################################################
action_cam = GoProActionCamera
if args.dji:
action_cam = DjiActionCamera
record_sections = defaultdict(action_cam.video_cls)
for fname in action_cam.video_cls.glob():
# collect and gather videos
part = action_cam.video_cls.create_video_part(fname.parent, fname.stem)
record_sections[part.session_name].add_part(part)
for s in record_sections.values():
s.ensure_all_parts_collected()
for s in tqdm.tqdm(record_sections.values()):
s.prepare_rename()
if not args.dry_run:
for s in record_sections.values():
s.perform_rename()
photos = []
for fname in action_cam.photo_cls.glob():
# for fname in ROOT.glob("G0*.JPG"):
# collect and gather videos
photos.append(action_cam.photo_cls(fname))
if not args.dry_run:
for s in photos:
s.perform_rename()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment