Last active
January 24, 2019 21:31
-
-
Save agrif/d28de6a3dcc6af5b1cf37c64dc5db649 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import subprocess | |
import json | |
import os | |
import random | |
import tempfile | |
import pysubs2 | |
import attr | |
import whoosh.fields | |
import whoosh.index | |
import whoosh.qparser | |
import click | |
@attr.s | |
class FFmpeg: | |
cmd = attr.ib() | |
def run(self, *args): | |
#return subprocess.check_output([self.cmd, '-nostdin', '-y'] + list(args)) | |
return subprocess.check_output([self.cmd, '-nostdin', '-y', '-hide_banner', '-loglevel', 'panic'] + list(args)) | |
def read_subs(self, path): | |
out = self.run('-i', path, '-f', 'ass', '-') | |
return pysubs2.SSAFile.from_string(out.decode('utf-8')) | |
def get_image(self, path, start, time, outpath): | |
try: | |
self.run('-ss', str(start / 1000), '-i', path, '-copyts', '-ss', str(time / 1000), '-filter_complex', "subtitles='{}'".format(path.replace("'", r"\'").replace(':', r'\:')), '-vframes', '1', '-f', 'image2', outpath) | |
except subprocess.CalledProcessError: | |
self.run('-ss', str(start / 1000), '-i', path, '-copyts', '-ss', str(time / 1000), '-filter_complex', '[0:v][0:s]overlay[v]', '-map', '[v]', '-vframes', '1', '-f', 'image2', outpath) | |
@attr.s | |
class Result: | |
path = attr.ib() | |
content = attr.ib() | |
start = attr.ib() | |
end = attr.ib() | |
@property | |
def midpoint(self): | |
return (self.start + self.end) / 2 | |
@attr.s | |
class Database: | |
path = attr.ib() | |
ix = attr.ib() | |
relative = attr.ib() | |
@classmethod | |
def create(cls, path, relative=False): | |
schema = whoosh.fields.Schema( | |
path=whoosh.fields.ID(stored=True), | |
start=whoosh.fields.NUMERIC(stored=True), | |
end=whoosh.fields.NUMERIC(stored=True), | |
content=whoosh.fields.TEXT(stored=True), | |
) | |
config = dict( | |
relative=relative, | |
) | |
os.makedirs(path, exist_ok=True) | |
ix = whoosh.index.create_in(path, schema) | |
with open(os.path.join(path, 'subsearch-config.json'), 'w') as f: | |
json.dump(config, f) | |
return cls(path, ix, relative) | |
@classmethod | |
def open(cls, path): | |
ix = whoosh.index.open_dir(path) | |
with open(os.path.join(path, 'subsearch-config.json')) as f: | |
config = json.load(f) | |
return cls(path, ix, **config) | |
def search(self, query, **kwargs): | |
q = whoosh.qparser.QueryParser("content", self.ix.schema).parse(query) | |
with self.ix.searcher() as searcher: | |
for r in searcher.search(q, **kwargs): | |
d = dict(r) | |
d['path'] = os.path.normpath(os.path.join(self.path, d['path'])) | |
yield Result(**d) | |
def add_recursive(self, ff, path, **kwargs): | |
for d in sorted(os.listdir(path)): | |
full = os.path.join(path, d) | |
self.add(ff, full, **kwargs) | |
def add(self, ff, path, report=None, relative=None): | |
if relative is None: | |
relative = self.relative | |
if os.path.isdir(path): | |
return self.add_recursive(ff, path, report=report, relative=relative) | |
realpath = path | |
if relative: | |
path = os.path.normpath(os.path.relpath(path, self.path)) | |
else: | |
path = os.path.abspath(path) | |
if report: | |
report(path) | |
try: | |
subs = ff.read_subs(realpath) | |
except subprocess.CalledProcessError: | |
if report: | |
report("!!! Error extracting subtitles...") | |
return | |
writer = self.ix.writer() | |
for ev in subs.events: | |
if ev.is_comment: | |
continue | |
writer.add_document(path=path, start=ev.start, end=ev.end, content=ev.plaintext) | |
writer.commit() | |
@click.group() | |
def cli(): | |
pass | |
@cli.command() | |
@click.option('--relative/--absolute', '-r/-a', is_flag=True) | |
@click.argument('dbpath', type=click.Path()) | |
def init(dbpath, relative): | |
Database.create(dbpath, relative=relative) | |
@cli.command() | |
@click.option('--relative/--absolute', '-r/-a', is_flag=True, default=None) | |
@click.argument('dbpath', type=click.Path()) | |
@click.argument('paths', nargs=-1, type=click.Path(exists=True)) | |
def add(dbpath, paths, relative): | |
db = Database.open(dbpath) | |
ff = FFmpeg('ffmpeg') | |
def report(s): | |
click.echo('adding: {}'.format(s)) | |
for path in paths: | |
db.add(ff, path, report=report, relative=relative) | |
@cli.command() | |
@click.option('--image', '-i', type=click.Path()) | |
@click.option('--upload', '-u', is_flag=True) | |
@click.argument('dbpath', type=click.Path()) | |
@click.argument('query', nargs=-1) | |
def search(dbpath, query, upload=False, image=None): | |
query = ' '.join(query) | |
db = Database.open(dbpath) | |
ff = FFmpeg('ffmpeg') | |
r = list(db.search(query)) | |
def do_upload(imgpath): | |
url = subprocess.check_output(['curl', '-s', '-F', 'file=@{}'.format(imgpath), 'http://0x0.st']).decode('utf-8').strip() | |
click.echo('Url: {}'.format(url)) | |
if not r: | |
return | |
ev = random.choice(r) | |
click.echo('Path: {}'.format(ev.path)) | |
click.echo('Time: {:.03f} - {:.03f}'.format(ev.start / 1000, ev.end / 1000)) | |
click.echo('Content:') | |
for l in ev.content.splitlines(): | |
click.echo(' ' + l) | |
if image: | |
ff.get_image(ev.path, ev.start, ev.midpoint, image) | |
if upload: | |
if image: | |
do_upload(image) | |
else: | |
with tempfile.TemporaryDirectory(prefix='subsearch.') as d: | |
imgpath = os.path.join(d, 'out.png') | |
ff.get_image(ev.path, ev.start, ev.midpoint, imgpath) | |
do_upload(imgpath) | |
if __name__ == "__main__": | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment