Skip to content

Instantly share code, notes, and snippets.

@leocxy
Created October 1, 2022 08:17
Show Gist options
  • Save leocxy/ca44873b478374574d77dd89a9ca83e8 to your computer and use it in GitHub Desktop.
Save leocxy/ca44873b478374574d77dd89a9ca83e8 to your computer and use it in GitHub Desktop.
Download the MPEG2-TS files and combine all of them to MP4 file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# @Project : programming
# @File : mpeg2_download_and_combine.py
# @Author : Leo Chen <[email protected]>
# @Date : 2022/9/29 20:19
"""
import os
from concurrent.futures import ThreadPoolExecutor
from os import path, makedirs
from logging import Formatter, Logger
from logging.handlers import RotatingFileHandler
from time import sleep
from random import randint
from tqdm import tqdm
from re import search
import requests
FOLD_PATH = '/absolute/path'
FOLD_NAME = 'folder_name'
URL_PATH = "https://xxx.xxx.xxx/250aa7c0-d610-4b56-acdd-8c2207ee9db6/1280x720/video{}.ts"
MAX_NUM = 2110
logger = Logger('download_logger')
handler = RotatingFileHandler(path.join(FOLD_PATH, 'logs.log'), maxBytes=10240000, backupCount=5)
handler.setLevel('DEBUG')
handler.setFormatter(Formatter('[%(asctime)s] %(threadName)s %(levelname)s: %(message)s'))
logger.addHandler(handler)
# Create Folder
if not path.exists(path.join(FOLD_PATH, FOLD_NAME)):
makedirs(path.join(FOLD_PATH, FOLD_NAME))
def set_request_headers():
return {
'authority': 'xxx.com',
'origin': 'https://xxx.com',
'referer': 'https://xxx.com/en/yyy',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
'sec-ch-ua': '"Google Chrome";v="105", "Not)A;Brand";v="8", "Chromium";v="105"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
}
def download_mpeg2_ts(url: str, num: int):
url = url.format(num)
filename = url.split('/')[-1]
filepath = path.join(FOLD_PATH, FOLD_NAME, filename)
if not path.exists(filepath):
with open(filepath, 'wb') as f:
try:
res = requests.get(url, headers=set_request_headers(), timeout=120)
res.raise_for_status()
res.encoding = 'utf-8'
logger.debug('正下载第 {} 个片段'.format(num))
f.write(res.content)
except:
logger.error('第 {} 个片段下载失败'.format(num))
sleep(randint(1, 3))
def thread_callback(task):
if error := task.exception():
logger.exception(error)
def multiple_threads():
with ThreadPoolExecutor(max_workers=2, thread_name_prefix='worker') as pool:
for i in range(MAX_NUM):
task = pool.submit(download_mpeg2_ts, url=URL_PATH, num=(i + 1))
task.add_done_callback(thread_callback)
def single_process():
files = range(MAX_NUM)
for i in tqdm(files, desc='Downloading: '):
download_mpeg2_ts(URL_PATH, (i + 1))
def sort_by_name(item):
rs = search('\d+', item)
num = 0
if rs:
num = int(rs.group())
return (num, item)
def combine_video():
folder_path = path.join(FOLD_PATH, FOLD_NAME)
files = os.listdir(folder_path)
files = sorted(files, key=sort_by_name)
video_path = path.join(folder_path, '{}.mp4'.format(FOLD_NAME))
with open(video_path, 'ab') as f:
for file in tqdm(files, desc='Combining video file: '):
filepath = path.join(folder_path, file)
with open(filepath, 'rb') as f1:
f.write(f1.read())
if __name__ == '__main__':
# multiple_threads()
single_process()
# combine_video()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment