Skip to content

Instantly share code, notes, and snippets.

@xjasonlyu
Last active July 23, 2021 09:16
Show Gist options
  • Save xjasonlyu/5aae86d3f4875849175d6be2b6ccc08c to your computer and use it in GitHub Desktop.
Save xjasonlyu/5aae86d3f4875849175d6be2b6ccc08c to your computer and use it in GitHub Desktop.
VideoSubFinder OCR using Google Cloud
import os
import re
import sys
import logging
from io import StringIO
from concurrent import futures
from google.cloud import vision
IMAGE_EXT = ('.jpg', '.jpeg', '.png')
logging.basicConfig(level=logging.INFO)
def concurrentMap(fn, *args, timeout=None, max_workers=None):
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
return tuple(executor.map(fn, *args, timeout=timeout))
def detect_text(path: str) -> str:
"""Detects text in the file."""
with open(path, 'rb') as image_file:
content = image_file.read()
image = vision.Image(content=content)
client = vision.ImageAnnotatorClient()
response = client.text_detection(image=image)
if response.error.message:
raise Exception(
'{}\nFor more info on error messages, check: '
'https://cloud.google.com/apis/design/errors'.format(
response.error.message))
return response.full_text_annotation.text
def detect(path: str):
if os.path.isdir(path):
logging.info('Ignore directory: {}'.format(path))
return
ext = os.path.splitext(path)[-1]
if ext not in IMAGE_EXT:
logging.info('Ignore non-image file: {}'.format(path))
return
filename = os.path.splitext(path)[0]+'.txt'
if os.path.exists(filename):
logging.info('Ignore detected image: {}'.format(path))
return
text = detect_text(path)
logging.info('Detect image: {}: {}'.format(path, repr(text)))
# save text to file
with open(filename, 'w', encoding='utf-8') as f:
f.write(text)
def create_srt(filename: str, image_dir: str):
r = re.compile(
r'^(\d)_(\d\d)_(\d\d)_(\d\d\d)__(\d)_(\d\d)_(\d\d)_(\d\d\d)_\d+\.txt$')
files = [i for i in os.listdir(image_dir) if r.match(i)]
files.sort()
with StringIO() as s:
for i, f in enumerate(files):
result = r.findall(f)[0]
with open(os.path.join(image_dir, f), encoding='utf-8') as fp:
content = fp.read()
s.write('''{}
{}:{}:{},{} --> {}:{}:{},{}
{}
'''.format(i+1, *result, content))
srt_file = os.path.join(image_dir, filename)
if os.path.exists(srt_file):
logging.info('SRT file exists')
return
with open(srt_file, 'w', encoding='utf-8') as fp:
fp.write(s.getvalue())
logging.info('Create srt file: {}'.format(filename))
def main():
if len(sys.argv) != 2:
print('Usage: python3 {} <image_dir>'.format(os.path.basename(__file__)))
sys.exit(1)
image_dir = sys.argv[1]
items = os.listdir(image_dir)
items.sort()
_ = concurrentMap(detect, [os.path.join(image_dir, i)
for i in items], max_workers=2)
# create_srt('123.srt', image_dir)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment