Last active
October 18, 2023 06:53
-
-
Save soraxas/f2c0cc5f2207997932afadad35971080 to your computer and use it in GitHub Desktop.
A script that rename recorded videos into friendly name
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import glob | |
import dataclasses | |
import subprocess | |
import os | |
import tqdm | |
from abc import ABC | |
from datetime import datetime | |
from pathlib import Path | |
from typing import List | |
from collections import defaultdict | |
# from joblib import Parallel, delayed | |
ROOT = Path(__file__).parent / "DCIM" / "100GOPRO" | |
GLOBAL_RENAME_REGISTRY = set() | |
@dataclasses.dataclass | |
class VideoPart: | |
def __init__(self, identity) -> None: | |
self.identity = identity | |
@property | |
def low_res(self) -> Path: | |
return ROOT / f"{self.identity}.LRV" | |
@property | |
def thumbnail(self) -> Path: | |
return ROOT / f"{self.identity}.THM" | |
@property | |
def video(self) -> Path: | |
return ROOT / f"{self.identity}.MP4" | |
def get_deleted(self) -> List[Path]: | |
return [p for p in (self.low_res, self.thumbnail, self.video) if not p.exists()] | |
def get_exists(self) -> List[Path]: | |
return [p for p in (self.low_res, self.thumbnail, self.video) if p.exists()] | |
collector = dict() | |
for fname in ROOT.glob("*.*"): | |
# collect and gather videos | |
if fname.suffix.upper() in (".MP4", ".THM", ".LRV"): | |
if fname.stem not in collector: | |
collector[fname.stem] = VideoPart(fname.stem) | |
for identity, videopart in collector.items(): | |
deleted = videopart.get_deleted() | |
if len(deleted): | |
print( | |
f"Content {identity}: the parts for '{', '.join(d.suffix[1:] for d in deleted)}' seems to have deleted. Remaining:" | |
) | |
remaining_parts = videopart.get_exists() | |
for i, remaining in enumerate(remaining_parts): | |
print(f" {i+1}. {remaining.name}") | |
out = "lol" | |
while out not in ("", "y", "n"): | |
_range = "/".join(f"{i+1}" for i in range(len(remaining_parts))) | |
out = input( | |
f" > Delete the remaining parts? [y(es)/N(o) | view: {_range}] " | |
).lower() | |
try: | |
num = int(out) | |
except ValueError: | |
pass | |
else: | |
import subprocess | |
if 0 < num <= len(remaining_parts): | |
print( | |
subprocess.check_output(["xdg-open", remaining_parts[num - 1]]) | |
) | |
if out == "y": | |
for remaining in videopart.get_exists(): | |
print(f" > deleting {remaining.name}") | |
remaining.unlink() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import glob | |
import dataclasses | |
import subprocess | |
import argparse | |
import os | |
import sys | |
import tqdm | |
from abc import ABC | |
from datetime import datetime | |
from pathlib import Path | |
from typing import List | |
from collections import defaultdict | |
# from joblib import Parallel, delayed | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"input_path", | |
help="input folder that contains images/videos", | |
) | |
parser.add_argument( | |
"--dry-run", | |
"-d", | |
action="store_true", | |
help="input folder that contains images/videos", | |
) | |
parser.add_argument( | |
"--dji", | |
action="store_true", | |
help="use dji action cam", | |
) | |
args = parser.parse_args() | |
input_path = Path(args.input_path) | |
if not input_path.exists(): | |
print(f"Given input paht {input_path} does not exists") | |
ROOT = input_path | |
# ROOT = Path(__file__).parent / "DCIM" / "100GOPRO" | |
GLOBAL_RENAME_REGISTRY = set() | |
def get_creation_date(filename: str): | |
return ( | |
subprocess.check_output( | |
[ | |
"exiftool", | |
"-T", | |
"-api", | |
"LargeFileSupport=1", # enable flag for large files | |
"-CreateDate", | |
filename, # target file | |
] | |
) | |
.decode() | |
.strip() | |
) | |
def format_creation_date(filename: str) -> str: | |
create_date = get_creation_date(filename) | |
date_obj = datetime.strptime(create_date, "%Y:%m:%d %H:%M:%S") | |
return date_obj.strftime("%Y-%m-%d_%H%M%S") | |
class Renameable(ABC): | |
def __init__(self) -> None: | |
self.renames = [] | |
@classmethod | |
def glob(cls) -> List[str]: | |
raise NotImplementedError() | |
def perform_rename(self): | |
for a, b in self.renames: | |
os.rename(a, b) | |
def _verify_rename(self, original, new): | |
print(f"{original} -> {new}") | |
if os.path.exists(new): | |
raise ValueError(f"Destination exists!") | |
if new in GLOBAL_RENAME_REGISTRY: | |
raise ValueError(f"Multiple files will be renamed to same destination!") | |
GLOBAL_RENAME_REGISTRY.add(new) | |
self.renames.append((original, new)) | |
class RenameableRecordSession(Renameable): | |
def __init__(self) -> None: | |
super().__init__() | |
self.video_parts: List["VideoPart"] = [] | |
@classmethod | |
def create_video_part(cls, dir: Path, basename: str): | |
raise NotImplementedError() | |
def add_part(self, part: "VideoPart"): | |
self.video_parts.append(part) | |
class Photo(Renameable): | |
def __init__(self, filepath: Path) -> None: | |
super().__init__() | |
target_name = ( | |
filepath.parent / f"{format_creation_date(filepath)}{filepath.suffix}" | |
) | |
self._verify_rename(str(filepath), str(target_name)) | |
############################################################################### | |
############################################################################### | |
class GoProPhoto(Photo): | |
@classmethod | |
def glob(cls) -> List[str]: | |
return ROOT.glob("GOPR*.JPG") | |
class GoProRecordSession(RenameableRecordSession): | |
@dataclasses.dataclass | |
class VideoPart: | |
low_res: Path | |
thumbnail: Path | |
video: Path | |
identity: str | |
def __init__(self, dir: Path, basename: str) -> None: | |
basename = basename[2:] | |
self.identity = basename | |
self.low_res = dir / f"GL{basename}.LRV" | |
self.thumbnail = dir / f"GX{basename}.THM" | |
self.video = dir / f"GX{basename}.MP4" | |
self.dir = dir | |
for path in [ | |
self.low_res, | |
self.thumbnail, | |
self.video, | |
]: | |
if not path.exists(): | |
raise ValueError(f"{path} does not exists") | |
@property | |
def session_name(self) -> str: | |
return self.identity[2:] | |
@property | |
def part_num(self) -> int: | |
return int(self.identity[:2]) | |
def __lt__(self, other: "GoProRecordSession.VideoPart"): | |
return self.part_num < other.part_num | |
################################################################## | |
@classmethod | |
def glob(cls) -> List[str]: | |
return ROOT.glob("G[XL]*.[mM][pP]4") | |
@classmethod | |
def create_video_part(cls, dir: Path, basename: str): | |
return GoProRecordSession.VideoPart(dir, basename) | |
def ensure_all_parts_collected(self): | |
self.video_parts = sorted(self.video_parts) | |
if len(self.video_parts) != self.video_parts[-1].part_num: | |
raise ValueError( | |
f"Got latest part {self.video_parts[-1]}," | |
f"but was only able to collect {len(self.video_parts)} parts: {self.video_parts}" | |
) | |
def prepare_rename(self): | |
self.renames.clear() | |
for part in self.video_parts: | |
new_name = format_creation_date(str(self.video_parts[0].video)) | |
if len(self.video_parts) > 1: | |
new_name += f".{part.part_num:02d}" | |
self._verify_rename(part.low_res, part.dir / f"{new_name}[Low-Res].mp4") | |
self._verify_rename(part.thumbnail, part.dir / f"{new_name}.THM") | |
self._verify_rename(part.video, part.dir / f"{new_name}.mp4") | |
class DjiPhoto(Photo): | |
@classmethod | |
def glob(cls) -> List[str]: | |
return ROOT.glob("DJI_*.JPG") | |
class DjiRecordSession(RenameableRecordSession): | |
@dataclasses.dataclass | |
class VideoPart: | |
low_res: Path | |
video: Path | |
identity: str | |
def __init__(self, dir: Path, basename: str) -> None: | |
basename = basename[4:] | |
self.identity = basename | |
self.low_res = dir / f"DJI_{basename}.LRF" | |
self.video = dir / f"DJI_{basename}.MP4" | |
self.dir = dir | |
for path in [ | |
self.low_res, | |
self.video, | |
]: | |
if not path.exists(): | |
raise ValueError(f"{path} does not exists") | |
@property | |
def session_name(self) -> str: | |
return self.identity | |
@property | |
def part_num(self) -> int: | |
# not used | |
return 1 | |
# return int(self.identity[:4]) | |
def __lt__(self, other: "DjiRecordSession.VideoPart"): | |
return self.part_num < other.part_num | |
################################################################## | |
@classmethod | |
def glob(cls) -> List[str]: | |
return ROOT.glob("DJI_*.[mM][pP]4") | |
@classmethod | |
def create_video_part(cls, dir: Path, basename: str): | |
return DjiRecordSession.VideoPart(dir, basename) | |
def ensure_all_parts_collected(self): | |
self.video_parts = sorted(self.video_parts) | |
if len(self.video_parts) != self.video_parts[-1].part_num: | |
raise ValueError( | |
f"Got latest part {self.video_parts[-1]}," | |
f"but was only able to collect {len(self.video_parts)} parts: {self.video_parts}" | |
) | |
def prepare_rename(self): | |
self.renames.clear() | |
for part in self.video_parts: | |
new_name = format_creation_date(str(self.video_parts[0].video)) | |
if len(self.video_parts) > 1: | |
new_name += f".{part.part_num:02d}" | |
self._verify_rename(part.low_res, part.dir / f"{new_name}[Low-Res].mp4") | |
self._verify_rename(part.video, part.dir / f"{new_name}.mp4") | |
@dataclasses.dataclass | |
class ActionCamera: | |
photo_cls: Renameable | |
video_cls: RenameableRecordSession | |
GoProActionCamera = ActionCamera(GoProPhoto, GoProRecordSession) | |
DjiActionCamera = ActionCamera(DjiPhoto, DjiRecordSession) | |
############################################################################### | |
############################################################################### | |
action_cam = GoProActionCamera | |
if args.dji: | |
action_cam = DjiActionCamera | |
record_sections = defaultdict(action_cam.video_cls) | |
for fname in action_cam.video_cls.glob(): | |
# collect and gather videos | |
part = action_cam.video_cls.create_video_part(fname.parent, fname.stem) | |
record_sections[part.session_name].add_part(part) | |
for s in record_sections.values(): | |
s.ensure_all_parts_collected() | |
for s in tqdm.tqdm(record_sections.values()): | |
s.prepare_rename() | |
if not args.dry_run: | |
for s in record_sections.values(): | |
s.perform_rename() | |
photos = [] | |
for fname in action_cam.photo_cls.glob(): | |
# for fname in ROOT.glob("G0*.JPG"): | |
# collect and gather videos | |
photos.append(action_cam.photo_cls(fname)) | |
if not args.dry_run: | |
for s in photos: | |
s.perform_rename() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment