|
#!/usr/bin/python3 |
|
import time |
|
import subprocess |
|
import sys |
|
import os |
|
import abc |
|
import threading |
|
import random |
|
from pathlib import Path |
|
import tempfile |
|
|
|
_lock = threading.Lock() |
|
_id = 0 |
|
|
|
class NamedPipeBase(abc.ABC): |
|
path: Path |
|
|
|
def __init__(self): |
|
global _id |
|
with _lock: |
|
_id += 1 |
|
self.name = f"streamlinkpipe-{os.getpid()}-{_id}-{random.randint(0, 9999)}" |
|
print(f"Creating pipe {self.name}") |
|
self._create() |
|
|
|
@abc.abstractmethod |
|
def _create(self) -> None: |
|
raise NotImplementedError |
|
|
|
@abc.abstractmethod |
|
def open(self) -> None: |
|
raise NotImplementedError |
|
|
|
@abc.abstractmethod |
|
def write(self, data) -> int: |
|
raise NotImplementedError |
|
|
|
@abc.abstractmethod |
|
def close(self) -> None: |
|
raise NotImplementedError |
|
|
|
class NamedPipePosix(NamedPipeBase): |
|
mode = "wb" |
|
permissions = 0o660 |
|
fifo = None |
|
|
|
def _create(self): |
|
self.path = Path(tempfile.gettempdir(), self.name) |
|
os.mkfifo(self.path, self.permissions) |
|
|
|
def open(self): |
|
self.fifo = open(self.path, self.mode) |
|
|
|
def write(self, data): |
|
return self.fifo.write(data) |
|
|
|
def close(self): |
|
if self.fifo: |
|
self.fifo.close() |
|
self.fifo = None |
|
if self.path.is_fifo(): |
|
os.unlink(self.path) |
|
|
|
NamedPipe = NamedPipePosix |
|
args = sys.argv[1:] |
|
newargs = [] |
|
i = 0 |
|
headers = None |
|
inputs = [] |
|
output = None |
|
print("Input args:", args) |
|
while i < len(args): |
|
c = args[i] |
|
if c == '-headers': |
|
headers = args[i+1] |
|
i += 2 |
|
continue |
|
if c == '-i': |
|
inputs.append(args[i+1]) |
|
i += 2 |
|
continue |
|
if c.startswith('file:'): |
|
newargs += ['-movflags', 'empty_moov'] |
|
newargs.append('-') |
|
output = c[5:] |
|
i += 1 |
|
continue |
|
newargs.append(c) |
|
i += 1 |
|
|
|
print("Got header: ", headers) |
|
headers = dict([c.split(': ') for c in headers.strip('\r\n').split('\r\n')]) |
|
|
|
if len(inputs) != 2: |
|
subprocess.call(['/usr/bin/ffmpeg'] + args) |
|
time.sleep(5.0) |
|
sys.exit(0) |
|
|
|
import requests |
|
from requests.adapters import HTTPAdapter |
|
from urllib3 import Retry |
|
|
|
from multiprocessing.pool import ThreadPool |
|
|
|
DOWN_THREAD = 12 |
|
CHUNK_SIZE = 512*1024 |
|
|
|
def multithread_download(sess, url, headers, downRange): |
|
pool = ThreadPool(DOWN_THREAD) |
|
def downChunk(chunkRange): |
|
while True: |
|
try: |
|
newhdr = dict(headers) |
|
newhdr['Range'] = 'bytes=%d-%d' % (chunkRange[0], chunkRange[1] - 1) |
|
r = sess.get(url, headers=newhdr) |
|
assert r.status_code == 206 |
|
return r.content |
|
except Exception as e: |
|
import traceback; traceback.print_exc() |
|
chunkRanges = [] |
|
for i in range(downRange[0], downRange[1], CHUNK_SIZE): |
|
chunkRanges.append((i, min(downRange[1], i + CHUNK_SIZE))) |
|
chunks = pool.map(downChunk, chunkRanges) |
|
return b''.join(chunks) |
|
|
|
#t = multithread_download("http://ipv4.download.thinkbroadband.com/200MB.zip", {}, (100 * 1024, 5000 * 1024)) |
|
|
|
IOCHUNK = 4 * 1024 * 1024 |
|
def copy_to_pipe(inputUrl, pipe): |
|
pipe.open() |
|
try: |
|
with requests.Session() as sess: |
|
retry_strategy = Retry( |
|
connect=4, |
|
read=2, |
|
total=8, |
|
) |
|
sess.mount("https://", HTTPAdapter(max_retries=retry_strategy)) |
|
sess.mount("http://", HTTPAdapter(max_retries=retry_strategy)) |
|
r = sess.head(inputUrl, headers=headers, allow_redirects=True) |
|
assert r.status_code == 200 |
|
fulllen = int(r.headers['Content-Length']) |
|
print("Got full len: %d" % (fulllen)) |
|
curpos = 0 |
|
while curpos < fulllen: |
|
downRange = (curpos, min(curpos + IOCHUNK, fulllen)) |
|
retData = multithread_download(sess, inputUrl, headers, downRange) |
|
pipe.write(retData) |
|
curpos += len(retData) |
|
del retData |
|
except: |
|
import traceback; traceback.print_exc() |
|
finally: |
|
pipe.close() |
|
|
|
pipes = [NamedPipe() for _ in inputs] |
|
pipe_threads = [threading.Thread(target=copy_to_pipe, args=(inputUrl, np)) |
|
for inputUrl, np in |
|
zip(inputs, pipes)] |
|
for c in pipe_threads: |
|
c.start() |
|
|
|
time.sleep(4.0) |
|
|
|
newargs = ['-i', str(pipes[0].path), '-i', str(pipes[1].path)] + newargs |
|
print(newargs) |
|
with open(output, 'wb') as f: |
|
subprocess.call(['/usr/bin/ffmpeg'] + newargs, stdout=f) |
|
|
|
time.sleep(5.0) |
|
sys.exit(0) |