Last active
December 1, 2020 00:41
-
-
Save bertsky/9cd1a6ed9c2f3925a5339ed52f9be458 to your computer and use it in GitHub Desktop.
proof of concept for an OCR-D workflow engine that uses strong (API instead CLI) integration of processors and acts as a server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import re | |
import click | |
import json | |
from time import time | |
import flask | |
from distutils.spawn import find_executable as which | |
import ocrd | |
import ocrd.decorators | |
from ocrd_utils import getLogger, initLogging | |
from ocrd.processor.base import run_cli | |
from ocrd.resolver import Resolver | |
import ocrd.task_sequence | |
from ocrd_utils.logging import setOverrideLogLevel | |
# workaround venvs created for Python>=3.8 | |
from pkg_resources import load_entry_point | |
from ocrd.decorators import ocrd_loglevel | |
# :::stolen from ocrd.cli.process::: | |
# ---------------------------------------------------------------------- | |
# ocrd workflow | |
# ---------------------------------------------------------------------- | |
@click.command('workflow') | |
@ocrd_loglevel | |
#@click.option('-m', '--mets', help="METS to process", default="mets.xml") | |
#@click.option('-g', '--page-id', help="ID(s) of the pages to process") | |
#@click.option('--overwrite', is_flag=True, default=False, help="Remove output pages/images if they already exist") | |
@click.option('-h', '--host', help="host name/IP to listen at", default='127.0.0.1') | |
@click.option('-p', '--port', help="TCP port to listen at", default=5000, type=click.IntRange(min=1024)) | |
@click.argument('tasks', nargs=-1, required=True) | |
def workflow_cli(log_level, host, port, tasks): | |
""" | |
Start a server processing a series of tasks with strong Python integration | |
""" | |
initLogging() | |
log = getLogger('ocrd.workflow') | |
log.debug("Parsing and instantiating %d tasks", len(tasks)) | |
tasks = prepare_tasks(tasks) | |
app = flask.Flask(__name__) | |
@app.route('/process') | |
def process(): | |
if flask.request.args.get("mets"): | |
mets = flask.request.args["mets"] | |
else: | |
return 'Error: No METS' | |
if flask.request.args.get('page_id'): | |
page_id = flask.request.args["page_id"] | |
else: | |
page_id = '' | |
if flask.request.args.get('overwrite'): | |
overwrite = flask.request.args["overwrite"] in ["True", "true", "1"] | |
else: | |
overwrite = False | |
try: | |
run_tasks(mets, log_level, page_id, tasks, overwrite) | |
except Exception as e: | |
log.exception("Request '%s' failed", str(flask.request.args)) | |
return 'Failed: %s' % str(e) | |
return 'Finished' | |
log.debug("Running server on http://%s:%d", host, port) | |
app.run(host=host, port=port) | |
# :::stolen from ocrd.task_sequence::: | |
def prepare_tasks(task_strs): | |
tasks = [ProcessorTask.parse(task_str) for task_str in task_strs] | |
instances = [task.instantiate() for task in tasks] | |
return tasks | |
def run_tasks(mets, log_level, page_id, tasks, overwrite=False): | |
log = getLogger('ocrd.task_sequence.run_tasks') | |
resolver = Resolver() | |
workspace = resolver.workspace_from_url(mets) | |
if overwrite: | |
workspace.overwrite_mode = True | |
if log_level: | |
setOverrideLogLevel(log_level) | |
ocrd.task_sequence.validate_tasks(tasks, workspace, page_id, overwrite) | |
# Run the tasks | |
is_first = True | |
last_is_instance = False | |
for task in tasks: | |
is_instance = bool(task.instance) | |
log.info("Start processing %s task '%s'", | |
"API" if is_instance else "CLI", task) | |
if not is_first: | |
if not last_is_instance: | |
workspace.reload_mets() | |
elif not is_instance: | |
workspace.save_mets() | |
if is_instance: | |
# execute API | |
returncode = run_api( | |
task.instance, | |
workspace, | |
page_id=page_id, | |
input_file_grp=','.join(task.input_file_grps), | |
output_file_grp=','.join(task.output_file_grps) | |
) | |
else: | |
# execute API | |
returncode = run_cli( | |
task.executable, | |
mets, | |
resolver, | |
workspace, | |
log_level=log_level, | |
page_id=page_id, | |
overwrite=overwrite, | |
input_file_grp=','.join(task.input_file_grps), | |
output_file_grp=','.join(task.output_file_grps), | |
parameter=json.dumps(task.parameters) | |
) | |
# check return code | |
if returncode != 0: | |
raise Exception("%s exited with non-zero return value %s." % (task.executable, returncode)) | |
log.info("Finished processing task '%s'", task) | |
is_first = False | |
last_is_instance = is_instance | |
if last_is_instance: | |
workspace.save_mets() | |
processor_class = None | |
# amend ProcessorTask by object instance if possible | |
class ProcessorTask(ocrd.task_sequence.ProcessorTask): | |
def __init__(self, *args, **kwargs): | |
super(ProcessorTask, self).__init__(*args, **kwargs) | |
self.instance = None | |
# override class method to give us our objects | |
@classmethod | |
def parse(cls, argstr): | |
return super(cls, cls).parse(argstr) | |
def instantiate(self): | |
logger = getLogger('ocrd.task_sequence.ProcessorTask') | |
program = which(self.executable) | |
if not program: | |
logger.warning("Cannot find processor '%s' in PATH", self.executable) | |
return | |
# override modules | |
def ignore(anything): | |
return | |
global processor_class | |
processor_class = None | |
def get_processor_class(cls, **kwargs): | |
global processor_class | |
processor_class = cls | |
wrap_processor = ocrd.decorators.ocrd_cli_wrap_processor | |
ocrd.decorators.ocrd_cli_wrap_processor = get_processor_class | |
# run CLI merely to fetch class | |
with open(program) as f: | |
line = f.readline().strip() | |
if re.fullmatch('[#][!].*/python[0-9.]*', line): | |
code = compile(f.read(), program, 'exec') | |
sys_exit = sys.exit | |
sys.exit = ignore | |
sys_argv = sys.argv | |
sys.argv = [self.executable] | |
__name__ = '__main__' | |
exec(code) | |
sys.argv = sys_argv | |
sys.exit = sys_exit | |
logger.info("Instantiating %s for processor '%s'", | |
processor_class.__name__, self.executable) | |
self.instance = processor_class(None, parameter=self.parameters) | |
# circumvent calling CLI to get .ocrd_tool_json | |
self._ocrd_tool_json = self.instance.ocrd_tool | |
else: | |
logger.info("Non-Pythonic processor '%s' breaks the chain", self.executable) | |
# reset modules | |
ocrd.decorators.ocrd_cli_wrap_processor = wrap_processor | |
return bool(self.instance) | |
# :::partly stolen from ocrd.processor.run_processor::: | |
# :::partly stolen from ocrd.decorators.ocrd_cli_wrap_processor::: | |
def run_api(processor, | |
workspace, | |
page_id=None, | |
input_file_grp=None, | |
output_file_grp=None | |
): # pylint: disable=too-many-locals | |
""" | |
Set workspace in processor and run through it | |
Args: | |
processor (object): Processor instance | |
workspace (object): Workspace instance | |
""" | |
log = getLogger('ocrd.processor.helpers.run_api') | |
log.debug("Running processor %s", processor.__class__.__name__) | |
processor.page_id = page_id if page_id else None | |
processor.workspace = workspace | |
processor.input_file_grp = input_file_grp | |
processor.output_file_grp = output_file_grp | |
ocrd_tool = processor.ocrd_tool | |
parameter = processor.parameter | |
name = '%s v%s' % (ocrd_tool['executable'], processor.version) | |
otherrole = ocrd_tool['steps'][0] | |
logProfile = getLogger('ocrd.process.profile') | |
log.debug("Processor instance %s (%s doing %s)", processor, name, otherrole) | |
t0 = time() | |
try: | |
oldcwd = os.getcwd() | |
os.chdir(processor.workspace.directory) | |
processor.process() | |
except Exception: | |
log.exception("Failure in processor '%s'" % ocrd_tool['executable']) | |
return 1 | |
finally: | |
os.chdir(oldcwd) | |
t1 = time() - t0 | |
logProfile.info("Executing processor '%s' took %fs [--input-file-grp='%s' --output-file-grp='%s' --parameter='%s']" % ( | |
ocrd_tool['executable'], | |
t1, | |
input_file_grp if input_file_grp else '', | |
output_file_grp if output_file_grp else '', | |
json.dumps(parameter) if parameter else {} | |
)) | |
workspace.mets.add_agent( | |
name=name, | |
_type='OTHER', | |
othertype='SOFTWARE', | |
role='OTHER', | |
otherrole=otherrole | |
) | |
# we don't workspace.save_mets() here | |
return 0 | |
if __name__ == '__main__': | |
workflow_cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Superseded by OCR-D/core#652