Created
April 18, 2026 20:55
-
-
Save TheGroundZero/d5379775d3249fb3ad7fb79345713db7 to your computer and use it in GitHub Desktop.
Merges multipart MKV files (e.g. movie.part-01.mkv, movie.part-02.mkv, ...) into a single MKV, with chapters from each part remapped to their correct timestamps in the merged output.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| merge_mkv_parts.py | |
| Merges multipart MKV files (e.g. movie.part-01.mkv, movie.part-02.mkv, ...) | |
| into a single MKV, with chapters from each part remapped to their correct | |
| timestamps in the merged output. | |
| Requirements: ffmpeg + ffprobe in PATH | |
| Usage: | |
| python3 merge_mkv_parts.py [OPTIONS] | |
| Options: | |
| --input-dir DIR Directory containing the part files (default: current dir) | |
| --pattern GLOB Glob pattern to match parts (default: *part*.mkv) | |
| --output FILE Output file path (default: merged.mkv) | |
| --work-dir DIR Temp directory for intermediate files (default: system temp) | |
| --dry-run Print what would be done without executing | |
| Examples: | |
| # Merge all *part*.mkv in current directory | |
| python3 merge_mkv_parts.py | |
| # Specify directory and output | |
| python3 merge_mkv_parts.py --input-dir ~/Movies/MyFilm --output ~/Movies/MyFilm_merged.mkv | |
| # Custom glob pattern | |
| python3 merge_mkv_parts.py --input-dir /data/show --pattern "episode.s01e01.part-*.mkv" | |
| """ | |
| import argparse | |
| import glob | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import tempfile | |
| # ── Helpers ─────────────────────────────────────────────────────────────────── | |
| def run(cmd, **kwargs): | |
| """Run a shell command, raise on failure.""" | |
| print(f" $ {' '.join(str(c) for c in cmd)}") | |
| result = subprocess.run(cmd, check=True, **kwargs) | |
| return result | |
| def ffprobe_json(path, *extra_args): | |
| """Run ffprobe and return parsed JSON output.""" | |
| cmd = [ | |
| "ffprobe", "-v", "quiet", | |
| "-print_format", "json", | |
| *extra_args, | |
| str(path), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return json.loads(result.stdout) | |
| def get_duration_seconds(path): | |
| """Return duration of an MKV file in seconds (float).""" | |
| info = ffprobe_json(path, "-show_format") | |
| return float(info["format"]["duration"]) | |
| def get_chapters(path): | |
| """ | |
| Return list of chapter dicts from ffprobe. | |
| Each dict has: id, start_time (float, seconds), end_time (float, seconds), title (str) | |
| """ | |
| info = ffprobe_json(path, "-show_chapters") | |
| chapters = [] | |
| for ch in info.get("chapters", []): | |
| chapters.append({ | |
| "id": ch["id"], | |
| "start_time": float(ch["start_time"]), | |
| "end_time": float(ch["end_time"]), | |
| "title": ch.get("tags", {}).get("title", f"Chapter {ch['id'] + 1}"), | |
| }) | |
| return chapters | |
| def seconds_to_timestamp(s): | |
| """Convert seconds (float) to HH:MM:SS.mmm string for chapter files.""" | |
| s = max(0.0, s) | |
| h = int(s // 3600) | |
| m = int((s % 3600) // 60) | |
| sec = s % 60 | |
| return f"{h:02d}:{m:02d}:{sec:06.3f}" | |
| def write_ffmetadata(chapters, out_path): | |
| """ | |
| Write an FFmpeg metadata file containing the merged chapter list. | |
| Chapter timestamps are in milliseconds (ffmetadata TIMEBASE=1/1000). | |
| """ | |
| lines = [";FFMETADATA1\n"] | |
| for i, ch in enumerate(chapters): | |
| start_ms = int(round(ch["start_time"] * 1000)) | |
| end_ms = int(round(ch["end_time"] * 1000)) | |
| title = ch["title"] | |
| lines.append( | |
| f"\n[CHAPTER]\n" | |
| f"TIMEBASE=1/1000\n" | |
| f"START={start_ms}\n" | |
| f"END={end_ms}\n" | |
| f"title={title}\n" | |
| ) | |
| with open(out_path, "w", encoding="utf-8") as f: | |
| f.writelines(lines) | |
| print(f" Wrote chapter metadata → {out_path}") | |
| def natural_sort_key(s): | |
| """Sort key that handles numeric parts naturally (part-2 < part-10).""" | |
| return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', s)] | |
| # ── Core logic ──────────────────────────────────────────────────────────────── | |
| def collect_parts(input_dir, pattern): | |
| """Glob for parts and sort them naturally.""" | |
| search = os.path.join(input_dir, pattern) | |
| files = glob.glob(search) | |
| if not files: | |
| print(f"ERROR: No files matched pattern '{search}'") | |
| sys.exit(1) | |
| files.sort(key=lambda p: natural_sort_key(os.path.basename(p))) | |
| return files | |
| def build_merged_chapters(parts): | |
| """ | |
| For each part, fetch its chapters and offset their timestamps | |
| by the cumulative duration of all preceding parts. | |
| Returns (merged_chapters, part_durations). | |
| """ | |
| merged = [] | |
| offset = 0.0 | |
| part_durations = [] | |
| for path in parts: | |
| duration = get_duration_seconds(path) | |
| part_durations.append(duration) | |
| chapters = get_chapters(path) | |
| if chapters: | |
| for ch in chapters: | |
| merged.append({ | |
| "start_time": ch["start_time"] + offset, | |
| "end_time": ch["end_time"] + offset, | |
| "title": ch["title"], | |
| }) | |
| else: | |
| # No chapters in this part → synthesise one named after the file | |
| stem = os.path.splitext(os.path.basename(path))[0] | |
| merged.append({ | |
| "start_time": offset, | |
| "end_time": offset + duration, | |
| "title": stem, | |
| }) | |
| offset += duration | |
| # Fix any end_time overshoots on the last chapter | |
| if merged: | |
| merged[-1]["end_time"] = offset | |
| return merged, part_durations, offset | |
| def merge_videos(parts, concat_list_path, video_only_path, dry_run): | |
| """Use ffmpeg concat demuxer to join the video/audio streams.""" | |
| # Write concat list | |
| with open(concat_list_path, "w", encoding="utf-8") as f: | |
| for p in parts: | |
| # ffmpeg concat list needs escaped single quotes | |
| escaped = str(p).replace("'", "'\\''") | |
| f.write(f"file '{escaped}'\n") | |
| print(f" Wrote concat list → {concat_list_path}") | |
| if dry_run: | |
| print(f" [dry-run] Would concat {len(parts)} files → {video_only_path}") | |
| return | |
| run([ | |
| "ffmpeg", "-y", | |
| "-f", "concat", "-safe", "0", | |
| "-i", concat_list_path, | |
| "-c", "copy", # stream-copy: no re-encode | |
| "-map_chapters", "-1", # strip existing chapters (we'll add merged ones) | |
| video_only_path, | |
| ]) | |
| def inject_chapters(video_only_path, metadata_path, output_path, dry_run): | |
| """Mux the merged chapters into the concatenated video.""" | |
| if dry_run: | |
| print(f" [dry-run] Would inject chapters → {output_path}") | |
| return | |
| run([ | |
| "ffmpeg", "-y", | |
| "-i", video_only_path, | |
| "-i", metadata_path, | |
| "-map_metadata", "1", # take metadata (chapters) from second input | |
| "-map_chapters", "1", | |
| "-c", "copy", | |
| output_path, | |
| ]) | |
| # ── CLI ─────────────────────────────────────────────────────────────────────── | |
| def parse_args(): | |
| p = argparse.ArgumentParser( | |
| description="Merge multipart MKV files with chapter remapping.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=__doc__, | |
| ) | |
| p.add_argument("--input-dir", default=".", help="Directory with part files") | |
| p.add_argument("--pattern", default="*part*.mkv", help="Glob pattern for parts") | |
| p.add_argument("--output", default="merged.mkv", help="Output MKV file") | |
| p.add_argument("--work-dir", default=None, help="Temp directory (default: system temp)") | |
| p.add_argument("--dry-run", action="store_true", help="Print plan, do not execute") | |
| return p.parse_args() | |
| def main(): | |
| args = parse_args() | |
| input_dir = os.path.abspath(args.input_dir) | |
| output = os.path.abspath(args.output) | |
| print(f"\n{'='*60}") | |
| print(f" MKV Part Merger") | |
| print(f"{'='*60}") | |
| print(f" Input dir : {input_dir}") | |
| print(f" Pattern : {args.pattern}") | |
| print(f" Output : {output}") | |
| print(f" Dry run : {args.dry_run}") | |
| print() | |
| # 1. Discover parts | |
| parts = collect_parts(input_dir, args.pattern) | |
| print(f"Found {len(parts)} part(s):") | |
| for i, p in enumerate(parts, 1): | |
| print(f" {i:2}. {os.path.basename(p)}") | |
| print() | |
| # 2. Build merged chapter list | |
| print("Reading durations & chapters from each part…") | |
| merged_chapters, part_durations, total_duration = build_merged_chapters(parts) | |
| print(f"\nTotal merged duration: {seconds_to_timestamp(total_duration)}") | |
| print(f"Merged chapter list ({len(merged_chapters)} chapters):") | |
| for i, ch in enumerate(merged_chapters, 1): | |
| print(f" {i:3}. [{seconds_to_timestamp(ch['start_time'])} → " | |
| f"{seconds_to_timestamp(ch['end_time'])}] {ch['title']}") | |
| print() | |
| if args.dry_run: | |
| print("[dry-run] No files will be written. Re-run without --dry-run to execute.") | |
| return | |
| # 3. Work in a temp directory | |
| work_dir = args.work_dir or tempfile.mkdtemp(prefix="mkv_merge_") | |
| os.makedirs(work_dir, exist_ok=True) | |
| concat_list = os.path.join(work_dir, "concat_list.txt") | |
| video_only = os.path.join(work_dir, "merged_no_chapters.mkv") | |
| metadata_file = os.path.join(work_dir, "chapters.ffmetadata") | |
| # 4. Concatenate streams | |
| print("Step 1/3 — Concatenating video/audio streams (stream-copy, no re-encode)…") | |
| merge_videos(parts, concat_list, video_only, dry_run=False) | |
| # 5. Write chapter metadata | |
| print("\nStep 2/3 — Writing merged chapter metadata…") | |
| write_ffmetadata(merged_chapters, metadata_file) | |
| # 6. Inject chapters into final file | |
| print("\nStep 3/3 — Injecting chapters into output file…") | |
| inject_chapters(video_only, metadata_file, output, dry_run=False) | |
| # 7. Verify | |
| print("\nVerifying output…") | |
| out_chapters = get_chapters(output) | |
| out_duration = get_duration_seconds(output) | |
| print(f" Output duration : {seconds_to_timestamp(out_duration)}") | |
| print(f" Output chapters : {len(out_chapters)}") | |
| # Clean up intermediate file (keep work_dir for debugging if needed) | |
| try: | |
| os.remove(video_only) | |
| except OSError: | |
| pass | |
| print(f"\n✓ Done! Output: {output}\n") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment