Last active
October 25, 2019 04:43
-
-
Save Hellowlol/5f8545e999259b4371c91ac223409209 to your computer and use it in GitHub Desktop.
tqdm thread subprocess ffmpeg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import re | |
import subprocess | |
import time | |
def example_func(*args): | |
for i in range(1, 101): | |
s = random.choice([0.1, 0.3, 0.5, 0.7, 0.9]) | |
time.sleep(s) | |
yield i | |
def to_ms(s=None, des=None, **kwargs): | |
if s: | |
hour = int(s[0:2]) | |
minute = int(s[3:5]) | |
sec = int(s[6:8]) | |
ms = int(s[10:11]) | |
else: | |
hour = int(kwargs.get('hour', 0)) | |
minute = int(kwargs.get('min', 0)) | |
sec = int(kwargs.get('sec', 0)) | |
ms = int(kwargs.get('ms')) | |
result = (hour * 60 * 60 * 1000) + (minute * 60 * 1000) + (sec * 1000) + ms | |
if des and isinstance(des, int): | |
return round(result, des) | |
return result | |
def stream_ffmpeg(filename, url, *args): | |
durr = None | |
dur_regex = re.compile(r'Duration: (?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})') | |
time_regex = re.compile(r'\stime=(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})') | |
cmd = 'ffmpeg -i %s -n -vcodec copy -acodec ac3 "%s.mkv"' % (url, filename) | |
process = subprocess.Popen(cmd, | |
shell=False, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
universal_newlines=True) | |
for line in iter(process.stderr): | |
if durr is None and dur_regex.search(line): | |
dur = dur_regex.search(line).groupdict() | |
dur = to_ms(**dur) | |
result = time_regex.search(line) | |
if result and result.group('hour'): | |
elapsed_time = to_ms(**result.groupdict()) | |
yield elapsed_time / dur * 100 | |
# Since we might miss the last tick | |
yield 100 | |
def multi_progress_thread(tasks, func=None, workers=None): | |
""" tqdm nested helper if we dont know the range of the iterable and using treads. | |
The idea is pretty simple. Each task reports the progress to the queue | |
We read the queue and increment the progress bars. | |
tasks (list): of tuples, used as args for func | |
func: Teh function to execute in the threads, func must yield a progress in int | |
""" | |
from functools import partial | |
try: | |
from queue import Queue as queue | |
except ImportError: | |
from Queue import Queue as queue | |
import workerpool | |
import tqdm | |
class JOBZ(workerpool.Job): | |
def __init__(self, q, task_number, func=None): | |
"""Args: | |
q: queue | |
n: tasknumber | |
fun | |
""" | |
self.q = q | |
self.n = task_number | |
self.func = func | |
def run(self): | |
for i in self.func(): | |
self.q.put((self.n, i)) | |
# Set check done. | |
self.q.put((self.n, 'done')) | |
assert callable(func) | |
q = queue() | |
len_tasks = len(tasks) | |
w = len_taks is workers is None else workers | |
pool = workerpool.WorkerPool(size=w) | |
bars = [] | |
main_bar = tqdm.tqdm(total=len_tasks, position=0) | |
for i, task in enumerate(tasks): | |
# Increment the position since we want the | |
# main_bar to be on top. | |
pos = i + 1 | |
wrap_func = partial(func, *task) | |
j = JOBZ(q, i, wrap_func) | |
pool.put(j) | |
f = partial(tqdm.tqdm, total=100, position=pos, desc='T %s' % i) | |
bars.append(f()) | |
try: | |
exit = 0 | |
progress = {} | |
while True: | |
# Check if we should exit the thread and update the bars | |
if exit == len_tasks: | |
# Update sub-bars because of the exit | |
for bb in bars: | |
bb.n = 0 | |
bb.update(100) | |
# Update main bar. | |
main_bar.n = 0 | |
main_bar.update(len_tasks) | |
break | |
item = q.get() | |
if item is not None: | |
t, i = item | |
if i == 'done': | |
exit += 1 | |
continue | |
b = bars[t] | |
b.n = 0 | |
b.update(i) | |
progress[str(t)] = i | |
main_bar_progress = sum(progress.values()) / 100 | |
main_bar.n = 0 | |
main_bar.update(main_bar_progress) | |
except KeyboardInterrupt: | |
pass | |
pool.shutdown() | |
pool.wait() | |
multi_progress_thread([('x', i) for i in range(10)], func=example_func) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment