Last active
August 31, 2022 06:24
-
-
Save Flushot/8c3eaa918e21629765d9d99d8a6db889 to your computer and use it in GitHub Desktop.
Vimeo video downloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Run: pip install argparse requests tqdm | |
# Requires 'ffmpeg' is also installed on your system | |
import os | |
import sys | |
import base64 | |
import tempfile | |
import subprocess | |
import io | |
import shutil | |
from contextlib import closing | |
from urllib.parse import urljoin | |
from typing import * | |
from typing import BinaryIO | |
import argparse | |
import requests | |
import ffmpeg | |
from tqdm.auto import tqdm | |
def fetch_stream_segments(manifest_url: str, | |
stream_type: str, | |
best_key: Callable[[Dict, Dict], Any]) -> Generator[BinaryIO, None, None]: | |
base_url = manifest_url[:manifest_url.rfind('/', 0, -26) + 1] | |
# Download and parse manifest | |
with closing(requests.get(manifest_url)) as res: | |
res.raise_for_status() | |
manifest = res.json() | |
# Find highest quality stream | |
stream = max(manifest[stream_type], key=best_key) | |
# Yield first base64-encoded segment | |
if 'init_segment' in stream: | |
yield io.BytesIO(base64.b64decode(stream['init_segment'].encode('ascii'))) | |
# Iterate and yield the remaining segments (in order) | |
for i, segment in tqdm(enumerate(stream['segments']), | |
total=len(stream['segments']), | |
unit=f'segment', | |
desc=f'Fetching {stream_type} segments'): | |
segment_url = urljoin(base_url, stream['base_url'] + segment['url']) | |
tqdm.write(f"Fetching segment #{i}: {segment['start']} - {segment['end']} ({segment['size'] / 1024 / 1024:.1f} MB)") | |
with closing(requests.get(segment_url, stream=True)) as res: | |
res.raise_for_status() | |
yield res.raw | |
def join_media_files(video_file: str, audio_file: str, output_file: str) -> None: | |
subprocess.run(['ffmpeg', '-y', '-an', '-i', video_file, '-vn', '-i', audio_file, '-c:a', 'copy', '-c:v', 'copy', output_file]) | |
def main() -> None: | |
default_output_file = 'output.mp4' | |
argp = argparse.ArgumentParser(description='Download a video from Vimeo') | |
argp.add_argument('manifest_url', | |
help='Vimeo video manifest URL (find this by searching page network requests for "media.json")') | |
argp.add_argument('-o', '--output', | |
default=default_output_file, | |
help=f'Output video file (default: {default_output_file})') | |
argp.add_argument('--low-quality', | |
action='store_true', | |
help='Download lowest quality video (fastest)') | |
args = argp.parse_args(sys.argv[1:]) | |
best_key_factor = -1 if args.low_quality else 1 | |
with tempfile.TemporaryDirectory() as tmpdir: | |
video_file = os.path.join(tmpdir, 'video.mp4') | |
audio_file = os.path.join(tmpdir, 'audio.mp4') | |
with open(video_file, 'wb+') as video_fp, open(audio_file, 'wb+') as audio_fp: | |
# Download video segments | |
for segment in fetch_stream_segments(args.manifest_url, 'video', | |
best_key=lambda stream: best_key_factor * stream['height']): | |
shutil.copyfileobj(segment, video_fp) | |
# Download audio segments | |
for segment in fetch_stream_segments(args.manifest_url, 'audio', | |
best_key=lambda stream: best_key_factor * stream['bitrate']): | |
shutil.copyfileobj(segment, audio_fp) | |
# Combine video and audio file | |
join_media_files(video_file, audio_file, args.output) | |
print(f'Saved output to: {args.output}') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment