Last active
May 17, 2022 22:47
-
-
Save zrong/497b2782056047af0bb228bb2e335dff to your computer and use it in GitHub Desktop.
解析 m3u8 格式并下载,合并成一个大文件。
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
############################ | |
# 解析 m3u8 格式并下载 | |
# | |
# @created: 2022-05-17 | |
# @author: zrong | |
############################ | |
import m3u8 | |
import click | |
import time | |
from pathlib import Path | |
import httpx | |
import asyncio | |
def _get_file(d: Path, uri: str) -> Path: | |
return d.joinpath(uri.split('/')[-1]) | |
def merge_file(playlist: m3u8.M3U8, d: Path, big_file: Path): | |
with big_file.open(mode='a+b') as f: | |
for segment in playlist.segments: | |
seg_file: Path = _get_file(d, segment.uri) | |
f.write(seg_file.read_bytes()) | |
seg_file.unlink() | |
async def dl_file(d: str, uri: str, client: httpx.AsyncClient): | |
file: Path = _get_file(d, uri) | |
click.echo(f'正在下载 {file!s}') | |
resp: httpx.Response = await client.get(uri, timeout=100) | |
file.write_bytes(resp.content) | |
async def dl_segments(playlist: m3u8.M3U8, d: Path): | |
client: httpx.AsyncClient = httpx.AsyncClient() | |
L = await asyncio.gather( | |
*[dl_file(d, segment.uri, client) for segment in playlist.segments] | |
) | |
# client.aclose() | |
@click.command() | |
@click.option('--url', type=str, multiple=True, help='支持 URL。') | |
@click.option( | |
'--file', | |
type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), | |
multiple=True, | |
help='支持本地路径。', | |
) | |
def download(url: list[str], file: list[Path]): | |
"""下载并合并 m3u8 中的视频流。""" | |
if len(url) == 0 and len(file) == 0: | |
raise click.FileError('None', hint='请检查 file 或者 url 参数。') | |
cwd: Path = Path.cwd() | |
if len(file) > 0: | |
urls = [ | |
{ | |
'cwd': f.parent, | |
'url': f.resolve().as_posix(), | |
'name': f.name.split('.')[0] + '.ts', | |
} | |
for f in file | |
] | |
else: | |
urls = [{'cwd': cwd, 'url': u} for u in url] | |
for u in urls: | |
cwd = u.get('cwd') | |
name = u.get('name') | |
tmp_name = str(int(time.time())) | |
if name is None: | |
name = f'{tmp_name}.ts' | |
big_file: Path = cwd.joinpath(name) | |
tmp_dir: Path = cwd.joinpath(tmp_name).resolve() | |
tmp_dir.mkdir(exist_ok=True) | |
click.echo(f'创建临时文件夹: {tmp_dir!s}。') | |
# 解析 playlist | |
playlist = m3u8.load(u['url']) | |
asyncio.run(dl_segments(playlist, tmp_dir)) | |
click.echo(f'正在合并 {len(playlist.segments)} 个文件。') | |
merge_file(playlist, tmp_dir, big_file) | |
click.echo(f'合并完成,写入文件 {big_file!s}。') | |
tmp_dir.rmdir() | |
click.echo(f'删除临时文件夹: {tmp_dir!s}。') | |
if __name__ == '__main__': | |
download() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment