Last active
June 21, 2025 17:16
-
-
Save YuriyGuts/34631981d02b30bed0dff6171d7aa4cb to your computer and use it in GitHub Desktop.
Merge and encode dashcam videos stored on an SD card
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| r""" | |
| Concatenate and encode dashcam videos stored on an SD card. | |
| Assumes the videos are stored as *xxxx.avi / *xxxx.mp4 / *xxxx.mov, where xxxx is a | |
| sequential index. This should be compatible with most dashcam SoC manufacturers. | |
| System requirements and dependencies: | |
| ------------------------------------- | |
| * Cross-platform (tested on Windows, Linux, and macOS). | |
| * Requires ffmpeg [https://ffmpeg.org/] to be installed and available in PATH. | |
| * No other packages are required. Only built-in Python modules are used. | |
| Usage guide: | |
| ------------ | |
| The tool can operate in two modes. Run "dashcam-encode.py -h" for usage details. | |
| 1) "trips" mode: encodes all videos in the input directory, organizing them into trips | |
| according to file dates. Each logical trip will be encoded as a single output video. | |
| Run "dashcam-encode.py trips -h" for usage details. | |
| Example (default trip gap = 3 hours): | |
| Filename Date Inferred Trip Output File | |
| /DCIM | |
| |---/Video | |
| |-------/20230112152345_000001.MP4 Jan 12, 15:23 | => Trip 1 | Trip 1.mp4 | |
| |-------/20230112152445_000002.MP4 Jan 12, 15:24 | => Trip 1 | | |
| |-------/20230112193820_000003.MP4 Jan 12, 19:38 | => Trip 2 | Trip 2.mp4 | |
| |-------/20230113101200_000004.MP4 Jan 13, 10:12 | => Trip 3 | Trip 3.mp4 | |
| |-------/20230113101300_000005.MP4 Jan 13, 10:13 | >= Trip 3 | | |
| 2) "range" mode: selects all raw videos between the specified start/end index and | |
| encodes them as a single output video. | |
| Run "dashcam-encode.py range -h" for usage details. | |
| Example ("range 3 6"): | |
| /DCIM | |
| |---/Video | |
| |-------/20230112152345_000001.MP4 | |
| |-------/20230112152445_000003.MP4 <= These videos | |
| |-------/20230112193820_000004.MP4 <= will be selected | |
| |-------/20230113101200_000006.MP4 <= and merged | |
| |-------/20230113101300_000007.MP4 | |
| Usage examples: | |
| --------------- | |
| Encode all raw video files in the default location, group them into trips, | |
| and save each trip as a separate output video file: | |
| > dashcam-encode.py trips | |
| Encode all raw video files in the specified location, group them into trips, | |
| and save each trip as a separate output video file: | |
| > dashcam-encode.py trips --raw-video-dir "E:\\DCIM\\DCIMA" | |
| Encode all raw video files in the specified location, group them into trips | |
| where trips should be at least 8 hours apart, encode 3 trips in parallel: | |
| > dashcam-encode.py trips --min-trip-gap-hours 8 --job-count 3 | |
| Encode raw video files labeled from #15 to #319 (in the default location) | |
| and save them as a single output video file: | |
| > dashcam-encode.py range 15 319 | |
| Encode raw video files labeled from #15 to #319 (in the specified custom location) | |
| and save them as a single output video file named "Road Trip.mp4" | |
| > dashcam-encode.py range 15 319 --output-name "Road Trip" --raw-video-dir "E:\\Video" | |
| """ | |
| import argparse | |
| import contextlib | |
| import ctypes | |
| import dataclasses | |
| import datetime | |
| import logging | |
| import multiprocessing | |
| import os | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import typing as t | |
| # Default settings (can be overridden in the command line). | |
| DEFAULT_VIDEO_DIR_PATH = "E:\\DCIM\\Movie" | |
| DEFAULT_MIN_TRIP_GAP_HOURS = 3 | |
| DEFAULT_JOB_COUNT = 2 | |
| # Extensions of the video files to expect in the input directory. | |
| EXPECTED_VIDEO_EXTENSIONS = (".avi", ".mp4", ".mov") | |
| # Which FFmpeg executable to run. | |
| FFMPEG_EXECUTABLE = "ffmpeg" | |
| # Codec/quality settings for the output videos. | |
| OUTPUT_VIDEO_CODEC_PARAMETERS = "-c:v libx265 -crf 30 -preset fast" | |
| OUTPUT_AUDIO_CODEC_PARAMETERS = "-c:a aac -b:a 128k" | |
| OUTPUT_FORMAT = "mp4" | |
| # Hardware acceleration options. Leave blank ("") for no hardware acceleration. | |
| # Note: not all systems may support it. | |
| FFMPEG_HWACCEL_OPTIONS = "-hwaccel d3d12va" | |
| # pylint: disable=logging-fstring-interpolation | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)8s | %(message)s") | |
| LOGGER = logging.getLogger(__name__) | |
| @dataclasses.dataclass | |
| class RawVideoSegment: | |
| """Represents a single raw video file (e.g. a 1-minute segment) in the input directory.""" | |
| index: int | |
| filename: str | |
| @dataclasses.dataclass | |
| class Trip: | |
| """Video segments, logically grouped into a trip that should be encoded as a single file.""" | |
| id: str | |
| date: datetime.datetime | |
| raw_segments: list[RawVideoSegment] | |
| def get_full_trip_name(self) -> str: | |
| trip_date_formatted = self.date.strftime("%Y-%m-%d") | |
| return f"{trip_date_formatted} {self.id}" | |
| def __str__(self) -> str: | |
| trip_date_formatted = self.date.strftime("%Y-%m-%d") | |
| start_index = self.raw_segments[0].index | |
| end_index = self.raw_segments[-1].index | |
| return f"{self.id} / Date: {trip_date_formatted} / Segments: {start_index}-{end_index}" | |
| @dataclasses.dataclass | |
| class TripEncodeJobDefinition: | |
| """Parameters for a single encoding job in the parallel encoding pool.""" | |
| trip: Trip | |
| raw_video_dir: str | |
| @contextlib.contextmanager | |
| def prevent_os_sleep() -> t.Iterator[None]: | |
| """Prevent the operating system from sleeping.""" | |
| LOGGER.info(f"Preventing the OS from sleeping (platform: '{sys.platform}')") | |
| # Windows: call the Win32 API. | |
| if sys.platform.startswith("win"): | |
| es_continuous = 0x80000000 | |
| es_system_required = 0x00000001 | |
| ctypes.windll.kernel32.SetThreadExecutionState(es_continuous | es_system_required) | |
| try: | |
| yield | |
| finally: | |
| LOGGER.info("Allowing the OS to sleep again") | |
| ctypes.windll.kernel32.SetThreadExecutionState(es_continuous) | |
| # macOS: launch `caffeinate`. | |
| elif sys.platform == "darwin": | |
| caffeinate_proc = subprocess.Popen(["caffeinate"]) | |
| try: | |
| yield | |
| finally: | |
| LOGGER.info("Allowing the OS to sleep again") | |
| caffeinate_proc.terminate() | |
| # Linux: launch `systemd-inhibit`. | |
| elif sys.platform == "linux": | |
| inhibit_proc = subprocess.Popen( | |
| [ | |
| "systemd-inhibit", | |
| "--what=idle:sleep", | |
| "--why=Prevent OS sleep while the script is running", | |
| "bash", | |
| "-c", | |
| "while true; do sleep 60; done", | |
| ] | |
| ) | |
| try: | |
| yield | |
| finally: | |
| LOGGER.info("Allowing the OS to sleep again") | |
| inhibit_proc.terminate() | |
| # Other operating systems. | |
| else: | |
| LOGGER.warning(f"Sleep prevention not implemented for '{sys.platform}'") | |
| yield | |
| def parse_command_line_args(args: list[str]) -> argparse.Namespace: | |
| """Parse the arguments passed via the command line.""" | |
| parser = argparse.ArgumentParser( | |
| description="Concatenate and encode dashcam videos stored on an SD card." | |
| ) | |
| subparsers = parser.add_subparsers(dest="command", metavar="COMMAND") | |
| parser_trips_cmd = subparsers.add_parser( | |
| name="trips", | |
| help=( | |
| "Encode all videos in the input directory, organizing them into " | |
| "trips according to file dates." | |
| ), | |
| ) | |
| parser_trips_cmd.add_argument( | |
| "--min-trip-gap-hours", | |
| metavar="TG", | |
| help=( | |
| "The minimum date difference (in hours) between consecutive files " | |
| "for them to be considered separate trips." | |
| ), | |
| type=int, | |
| required=False, | |
| default=DEFAULT_MIN_TRIP_GAP_HOURS, | |
| ) | |
| parser_trips_cmd.add_argument( | |
| "--job-count", | |
| metavar="JC", | |
| help=( | |
| "The maximum number of trips allowed to be encoded in parallel. " | |
| "Adjust this number to change system resource utilization." | |
| ), | |
| type=int, | |
| required=False, | |
| default=DEFAULT_JOB_COUNT, | |
| ) | |
| parser_range_cmd = subparsers.add_parser( | |
| name="range", | |
| help=( | |
| "Concatenate input videos in the specified range " | |
| "and encode them into a single output video" | |
| ), | |
| ) | |
| parser_range_cmd.add_argument( | |
| "start_index", | |
| metavar="START-INDEX", | |
| help=( | |
| "Index of the first video (inclusive). " | |
| "Example: for 20230112152345_000007.MP4, specify 7." | |
| ), | |
| type=int, | |
| ) | |
| parser_range_cmd.add_argument( | |
| "end_index", | |
| metavar="END-INDEX", | |
| help=( | |
| "Index of the last video (inclusive). " | |
| "Example: for 20230112170446_000094.MP4, specify 94." | |
| ), | |
| type=int, | |
| ) | |
| parser_range_cmd.add_argument( | |
| "--output-name", | |
| metavar="NAME", | |
| help=( | |
| "Name of the output video (without extension). " | |
| "If omitted, a name will be generated automatically." | |
| ), | |
| type=str, | |
| required=False, | |
| ) | |
| for subparser in [parser_trips_cmd, parser_range_cmd]: | |
| subparser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help=( | |
| "Print out the discovered trips or segments, but do not " | |
| "run ffmpeg for actual encoding." | |
| ), | |
| ) | |
| subparser.add_argument( | |
| "--raw-video-dir", | |
| metavar="PATH", | |
| help=( | |
| f"Path to the raw video directory on the SD card, such as " | |
| f"'{DEFAULT_VIDEO_DIR_PATH}'." | |
| ), | |
| type=str, | |
| required=False, | |
| default=DEFAULT_VIDEO_DIR_PATH, | |
| ) | |
| subparser.add_argument( | |
| "--skip-raw-video-validation", | |
| action="store_true", | |
| help=( | |
| "Do not check the raw videos for readability. This is faster, " | |
| "but may result in abruptly cut encoded videos in case of " | |
| "corrupted files." | |
| ), | |
| ) | |
| parsed_args = parser.parse_args(args) | |
| return parsed_args | |
| def video_index_from_filename(filename: str) -> int: | |
| """ | |
| Extract the numeric index of the video from its filename. | |
| E.g., /path/to/videos/20230112170046_000090.MP4 -> 90 | |
| """ | |
| basename_without_ext, _ = os.path.splitext(os.path.basename(filename)) | |
| index = int(basename_without_ext[-4:]) | |
| return index | |
| def is_raw_video_readable(filename: str) -> bool: | |
| """Check whether the specified video file can be read by `ffmpeg`.""" | |
| cmd = f'{FFMPEG_EXECUTABLE} -v error -t 0.1 -i "{filename}" -f null -' | |
| LOGGER.info(f"Running: {cmd}") | |
| result = subprocess.run(cmd, shell=True, capture_output=True, check=False) | |
| return result.returncode == 0 | |
| def collect_raw_video_segments( | |
| raw_video_dir_path: str, | |
| start_index: int | None = None, | |
| end_index: int | None = None, | |
| check_readability: bool = True, | |
| ) -> list[RawVideoSegment]: | |
| """Scan the raw video directory for files matching the input criteria.""" | |
| LOGGER.info(f"Collecting files from '{raw_video_dir_path}'") | |
| filenames = [ | |
| filename | |
| for filename in os.listdir(raw_video_dir_path) | |
| if os.path.splitext(filename)[-1].lower() in EXPECTED_VIDEO_EXTENSIONS | |
| ] | |
| LOGGER.info(f"Found {len(filenames)} files with supported extensions") | |
| filenames = [os.path.join(raw_video_dir_path, filename) for filename in filenames] | |
| segments = [ | |
| RawVideoSegment(index=video_index_from_filename(filename), filename=filename) | |
| for filename in sorted(filenames, key=video_index_from_filename) | |
| ] | |
| if start_index is not None and end_index is not None: | |
| segments = [segment for segment in segments if start_index <= segment.index <= end_index] | |
| if check_readability: | |
| LOGGER.info("Checking raw video files for readability...") | |
| validated_segments = [] | |
| for segment in segments: | |
| if not is_raw_video_readable(segment.filename): | |
| LOGGER.warning(f"The input file {segment.filename} is not readable; skipping") | |
| else: | |
| validated_segments.append(segment) | |
| segments = validated_segments | |
| if not segments: | |
| raise RuntimeError("Could not find any files matching the input criteria") | |
| LOGGER.info(f"Collected {len(segments)} files matching the input criteria") | |
| return segments | |
| def group_segments_into_trips( | |
| segments: list[RawVideoSegment], | |
| min_trip_gap_hours: int, | |
| ) -> list[Trip]: | |
| """Given a list of video segments, bucket them into trips according to file date difference.""" | |
| trips: list[Trip] = [] | |
| file_dates = [ | |
| datetime.datetime.fromtimestamp(os.path.getmtime(segment.filename)) for segment in segments | |
| ] | |
| # Scan the files sequentially and create a new trip | |
| # each time the date difference is big enough. | |
| prev_file_date = None | |
| for segment, file_date in zip(segments, file_dates, strict=True): | |
| is_new_trip = ( | |
| prev_file_date is None | |
| or (file_date - prev_file_date).total_seconds() / 3600.0 > min_trip_gap_hours | |
| ) | |
| if is_new_trip: | |
| new_trip_id = f"Trip {len(trips) + 1}" | |
| trips.append(Trip(id=new_trip_id, date=file_date, raw_segments=[])) | |
| trips[-1].raw_segments.append(segment) | |
| prev_file_date = file_date | |
| return trips | |
| def generate_ffmpeg_input_file(segments: list[RawVideoSegment]) -> str: | |
| """ | |
| Build an input file list for ffmpeg containing the files to be concatenated. | |
| Returns | |
| ------- | |
| str | |
| The path of the generated ffmpeg input file. | |
| """ | |
| with tempfile.NamedTemporaryFile( | |
| mode="w", | |
| encoding="utf-8", | |
| prefix="dashcam-encode-", | |
| suffix=".txt", | |
| delete=False, | |
| ) as fp: | |
| LOGGER.info(f"Writing ffmpeg file list to '{fp.name}'") | |
| fp.writelines(f"file '{segment.filename}'\n" for segment in segments) | |
| return fp.name | |
| def run_ffmpeg(ffmpeg_input_filename: str, output_video_name: str) -> None: | |
| """Run video concatenation and encoding.""" | |
| output_filename = f"{output_video_name}.{OUTPUT_FORMAT}" | |
| cmd = ( | |
| # Allow ffmpeg to use absolute input paths (-safe 0) | |
| f"{FFMPEG_EXECUTABLE} -f concat -safe 0 " | |
| f'{FFMPEG_HWACCEL_OPTIONS} -i "{ffmpeg_input_filename}" ' | |
| # Video and audio codec parameters | |
| f"{OUTPUT_VIDEO_CODEC_PARAMETERS} {OUTPUT_AUDIO_CODEC_PARAMETERS} " | |
| # MPEG4 output | |
| f'"{output_filename}"' | |
| ) | |
| with prevent_os_sleep(): | |
| LOGGER.info(f"Running: {cmd}") | |
| subprocess.run(cmd, shell=True, check=True) | |
| def run_trip_encoding(trip_job_def: TripEncodeJobDefinition) -> None: | |
| """Run video concatenation and encoding for a single trip.""" | |
| trip = trip_job_def.trip | |
| trip_full_name = trip.get_full_trip_name() | |
| ffmpeg_input_filename = generate_ffmpeg_input_file(trip.raw_segments) | |
| run_ffmpeg( | |
| ffmpeg_input_filename=ffmpeg_input_filename, | |
| output_video_name=trip_full_name, | |
| ) | |
| LOGGER.info(f"Trip encoding completed: {trip_full_name}") | |
| def run_trips_command(parsed_args: argparse.Namespace) -> None: | |
| """Entry point for the "trips" subcommand.""" | |
| segments = collect_raw_video_segments( | |
| parsed_args.raw_video_dir, | |
| check_readability=not parsed_args.skip_raw_video_validation, | |
| ) | |
| trips = group_segments_into_trips(segments, parsed_args.min_trip_gap_hours) | |
| LOGGER.info(f"Discovered {len(trips)} trips") | |
| for trip in trips: | |
| LOGGER.info(trip) | |
| if parsed_args.dry_run: | |
| LOGGER.warning("Dry run mode enabled. Not running any encoding jobs.") | |
| return | |
| job_defs = [TripEncodeJobDefinition(trip, parsed_args.raw_video_dir) for trip in trips] | |
| with multiprocessing.Pool(parsed_args.job_count) as process_pool: | |
| process_pool.map(run_trip_encoding, job_defs) | |
| def run_range_command(parsed_args: argparse.Namespace) -> None: | |
| """Entry point for the "range" subcommand.""" | |
| segments = collect_raw_video_segments( | |
| raw_video_dir_path=parsed_args.raw_video_dir, | |
| start_index=parsed_args.start_index, | |
| end_index=parsed_args.end_index, | |
| check_readability=not parsed_args.skip_raw_video_validation, | |
| ) | |
| if parsed_args.dry_run: | |
| LOGGER.warning("Dry run mode enabled. Not running any encoding jobs.") | |
| return | |
| ffmpeg_input_filename = generate_ffmpeg_input_file(segments) | |
| output_name = ( | |
| parsed_args.output_name | |
| if parsed_args.output_name | |
| else f"dashcam-{parsed_args.start_index}-{parsed_args.end_index}" | |
| ) | |
| run_ffmpeg( | |
| ffmpeg_input_filename=ffmpeg_input_filename, | |
| output_video_name=output_name, | |
| ) | |
| def main() -> None: | |
| parsed_args = parse_command_line_args(sys.argv[1:]) | |
| if parsed_args.command == "trips": | |
| run_trips_command(parsed_args) | |
| elif parsed_args.command == "range": | |
| run_range_command(parsed_args) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment