Created
February 7, 2017 16:04
-
-
Save deliro/d121df87b05ecffc7010e67b7231220b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import csv | |
import os | |
import re | |
import subprocess | |
import sys | |
from collections import OrderedDict, namedtuple | |
from datetime import datetime, timedelta | |
Timing = namedtuple('Timing', 'start end') | |
class Subtitle: | |
__slots__ = ['number', 'timing', 'text'] | |
def __init__(self, number, timing, text): | |
self.timing = Timing(*timing) | |
self.number = number | |
self.text = text | |
def get_duration(self): | |
return self.timing.end - self.timing.start | |
def __str__(self): | |
return self.text | |
class SyncedSubtitle: | |
def __init__(self, sub_en, sub_ru): | |
self.sub_en = sub_en | |
self.sub_ru = sub_ru | |
def __iter__(self): | |
yield self.sub_en | |
yield self.sub_ru | |
class SubtitleParser: | |
@classmethod | |
def parse_file(cls, srt_file): | |
with open(srt_file, 'r') as fp: | |
return cls().parse(fp.read()) | |
def parse(self, srt_data): | |
result = [] | |
for block in srt_data.split('\n\n'): | |
parsed = self.parse_block(block) | |
if parsed: | |
result.append(parsed) | |
return result | |
def parse_block(self, block): | |
if not block.strip(): | |
return None | |
number, timing, text, *_ = block.split('\n') | |
return Subtitle( | |
number=number, text=self.parse_text(text), | |
timing=self.parse_timing(timing)) | |
def parse_text(self, text): | |
text = text.lstrip('...').replace('\n', ' ').strip() | |
return re.sub('<[^>]*>', '', text) | |
def parse_timing(self, timing): | |
start, end = timing.split(' --> ') | |
return self.parse_time(start), self.parse_time(end) | |
def parse_time(self, time_str): | |
time_str = time_str.replace(',', '.') | |
t = datetime.strptime(time_str, '%H:%M:%S.%f') | |
return timedelta(hours=t.hour, minutes=t.minute, | |
seconds=t.second, microseconds=t.microsecond) | |
class SubtitleSquasher: | |
@classmethod | |
def run(cls, *args, **kwargs): | |
return cls().sync(*args, **kwargs) | |
def sync(self, en_subs, ru_subs, offset=None): | |
sync_edge = timedelta(microseconds=200000) | |
synced = [] | |
if offset: | |
self.add_offset(ru_subs, timedelta(seconds=offset)) | |
for en_sub in en_subs: | |
gen = (ru_sub for ru_sub in ru_subs | |
if (max(ru_sub.timing.start, en_sub.timing.start) - | |
min(ru_sub.timing.end, en_sub.timing.end)) < sync_edge) | |
try: | |
ru_pair = next(gen) | |
except StopIteration: | |
ru_pair = Subtitle( | |
timing=(en_sub.timing.start, en_sub.timing.end), | |
number=None, text='') | |
synced.append(SyncedSubtitle(en_sub, ru_pair)) | |
return self.squash(synced) | |
def add_offset(self, subs, offset): | |
for sub in subs: | |
sub.timing.start += offset | |
sub.timing.end += offset | |
def squash(self, synced): | |
result, buffer_en, buffer_ru = [], [], [] | |
max_duration = timedelta(seconds=5) | |
for item in synced: | |
buffer_en.append(item.sub_en) | |
buffer_ru.append(item.sub_ru) | |
duration_en = sum((s.get_duration() for s in buffer_en), timedelta()) | |
duration_ru = sum((s.get_duration() for s in buffer_ru), timedelta()) | |
if (duration_en >= max_duration and duration_ru >= max_duration) \ | |
and self.is_complete_text(item.sub_en.text) \ | |
and self.is_complete_text(item.sub_ru.text): | |
result.append(SyncedSubtitle(self.merge(buffer_en), self.merge(self.unique(buffer_ru)))) | |
buffer_en, buffer_ru = [], [] | |
if buffer_en and buffer_ru: | |
result.append(SyncedSubtitle(self.merge(buffer_en), self.merge(self.unique(buffer_ru)))) | |
return result | |
def merge(self, cards): | |
if len(cards) == 1: | |
return cards[0] | |
number = '{}-{}'.format(cards[0].number, cards[-1].number) | |
timing = (cards[0].timing.start, cards[-1].timing.end) | |
text = ' '.join(c.text for c in cards) | |
return Subtitle(number=number, timing=timing, text=text) | |
def unique(self, subs): | |
ids = set() | |
result = [] | |
for sub in subs: | |
if sub.number not in ids: | |
result.append(sub) | |
ids.add(sub.number) | |
return result | |
def is_complete_text(self, text): | |
return text.endswith(('.', '!', '?')) and not text.endswith('...') | |
class FFmpegError(Exception): | |
pass | |
class MediaExtractor: | |
def __init__(self, media_dir): | |
self.screenshot_mapping = OrderedDict() | |
self.media_dir = media_dir | |
def mkdir(self): | |
try: | |
os.mkdir(self.media_dir) | |
return True | |
except FileExistsError: | |
return False | |
def run_command(self, command, **kwargs): | |
result = subprocess.run( | |
command.format(**kwargs), shell=True, stderr=subprocess.PIPE) | |
if result.stderr: | |
raise FFmpegError(result.stderr.decode('utf-8')) | |
def extract_screenshots(self, source_file): | |
if self.mkdir(): | |
self.run_command( | |
'ffmpeg -y -i "{source}" -vf scale=320:-1,fps=1/10 -qscale:v 10 -loglevel fatal "{output}"', | |
source=source_file, output=os.path.join(self.media_dir, '%d.jpg')) | |
files_count = len(os.listdir(self.media_dir)) | |
self.screenshot_mapping = OrderedDict((n * 10, '{}.jpg'.format(n)) for n in range(1, files_count)) | |
def get_screenshot(self, start_time): | |
result = None | |
for key, value in self.screenshot_mapping.items(): | |
if key >= start_time.total_seconds(): | |
result = (key, value) | |
break | |
if result: | |
key, value = result | |
del self.screenshot_mapping[key] | |
return '{}.{}'.format(self.media_dir, value) | |
def main(args): | |
ru_subs = SubtitleParser.parse_file(args.front) | |
en_subs = SubtitleParser.parse_file(args.back) | |
with open(args.name + '.csv', 'w') as output: | |
writer = csv.writer(output) | |
media = MediaExtractor(args.name) | |
if args.screenshots: | |
media.extract_screenshots(source_file=args.screenshots) | |
for front, back in SubtitleSquasher.run(ru_subs, en_subs, offset=args.offset): | |
row_data = [front.text, back.text if back else ''] | |
if args.screenshots: | |
screenshot_file = media.get_screenshot(front.timing.start) | |
row_data.append('<img src="{}" />'.format(screenshot_file)) | |
writer.writerow(row_data) | |
sys.stdout.write('Done\n') | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-f', '--front', dest='front', help='Subtitle file for the front side of cards') | |
parser.add_argument('-b', '--back', dest='back', help='Subtitle file for the back side of cards') | |
parser.add_argument('-o', '--offset', dest='offset', type=int, help='Subtitle offset') | |
parser.add_argument('-n', '--name', dest='name', help='Deck name') | |
parser.add_argument('-s', '--screenshots', dest='screenshots') | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment