Last active
September 28, 2017 13:51
-
-
Save jbaiter/83057595be6c10655ea68713fcdb47da to your computer and use it in GitHub Desktop.
Dependencies: `pip install click requests editdistance kraken lxml pillow-simd sickle`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
import json | |
import logging | |
import os | |
import re | |
from collections import OrderedDict | |
from io import BytesIO | |
import click | |
import requests | |
from editdistance import eval as levenshtein | |
from kraken import binarization, pageseg, rpred, serialization | |
from kraken.lib import models | |
from lxml import etree | |
from PIL import Image | |
from sickle import Sickle | |
NSMAP = {'dcorp': 'http://www.dspin.de/data/textcorpus', | |
'cmd': 'http://www.clarin.eu/cmd/', | |
'oai': 'http://www.openarchives.org/OAI/2.0/', | |
'tei': 'http://www.tei-c.org/ns/1.0'} | |
OAI_ENDPOINT = "http://fedora.dwds.de/oai-dta/" | |
HTML_PARSER = etree.HTMLParser() | |
MODEL = models.load_any(os.path.expanduser('~/.config/kraken/fraktur.pronn')) | |
TEXT_URL = "http://www.deutschestextarchiv.de/book/download_txt/{dta_id}" | |
IMG_URL = ("http://media.dwds.de" | |
"/dta/images/{dta_id}/{dta_id}_{page_id}_1600px.jpg") | |
PAGE_SPLIT_PAT = re.compile(r'\x0c\n(?=\[.*?\]\n)') | |
PAGE_ID_PAT = re.compile(r'\[(?:.*?/)?(\d{4})]') | |
ID_PAT = re.compile(r'^oai:dta:(.*)$') | |
logger = logging.getLogger() | |
def get_last_page_index(hocr_path): | |
try: | |
tree = etree.parse(hocr_path, HTML_PARSER) | |
return list(enumerate( | |
tree.findall(".//div[@class='ocr_page']")))[-1][0] | |
except: | |
return -1 | |
def get_text_pages(dta_id): | |
with open("/mnt/data/datasets/dta/txt/{}.txt".format(dta_id)) as fp: | |
text = fp.read().decode('utf8') | |
pages = OrderedDict() | |
for page in PAGE_SPLIT_PAT.split(text): | |
lines = [l.strip() for l in page.split('\n') if l.strip() != ''] | |
match = PAGE_ID_PAT.match(lines[0]) | |
if match: | |
page_id = match.group(1) | |
else: | |
page_id = unicode(int(pages.keys()[-1])+1) if pages else 0 | |
if not all(l.startswith('[') and l.endswith(']') for l in lines): | |
pages[page_id] = lines[1:] | |
return pages | |
def get_text_image(dta_id, page_id): | |
attempt = 0 | |
while True: | |
try: | |
resp = requests.get(IMG_URL.format(dta_id=dta_id, page_id=page_id), | |
timeout=30) | |
except Exception as e: | |
if attempt == 2: | |
raise e | |
else: | |
attempt += 1 | |
continue | |
return Image.open(BytesIO(resp.content)) | |
def get_prediction(img, img_path=None): | |
logger.debug("Binarizing") | |
binarized = binarization.nlbin(img, 0.5, 0.5, 1.0, 0.1, 80, 20, 5, 90) | |
logger.debug("Segmenting") | |
boxes = pageseg.segment(binarized, None, False) | |
logger.debug("Predicting") | |
preds = [pred for pred in rpred.rpred(MODEL, binarized, boxes, 16) | |
if pred.cuts] | |
return ([p.prediction for p in preds], | |
serialization.serialize(preds, img_path or "", | |
image_size=img.size)) | |
def get_aligned_hocr(aligned_lines, hocr): | |
tree = etree.fromstring(hocr, HTML_PARSER) | |
for line_idx, line_text in aligned_lines: | |
line_elem = tree.find(".//span[@class='ocr_line'][{0}]" | |
.format(line_idx+1)) | |
# Remove child (= ocr_wordx) elements, since we don't align the | |
# individual words | |
for child in line_elem.getchildren(): | |
line_elem.remove(child) | |
line_elem.text = line_text | |
# Remove `cuts` information from title | |
line_elem.set('title', line_elem.get('title').split(';')[0]) | |
non_aligned_lines = tree.xpath( | |
".//span[@class='ocr_line'][./span[@class='ocrx_word']]") | |
for line_elem in non_aligned_lines: | |
line_elem.set('class', 'ocr_line not_aligned') | |
return tree.find(".//div[@class='ocr_page']") | |
def align_dta(dta_id, image_dir=None, target_path=None): | |
if not os.path.exists(image_dir): | |
os.makedirs(image_dir) | |
pages = get_text_pages(dta_id) | |
num_too_long = sum(1 for p in pages.values() | |
if any(len(l) >= 150 for l in p)) | |
too_long_ratio = (num_too_long / len(pages)) | |
if too_long_ratio >= 0.25: | |
logger.warn("{:.2f}% of the pages in {} have lines with more than 150 " | |
"characters, likely the transcription did not preserve " | |
"line-boundaries.".format(100*too_long_ratio, dta_id)) | |
return None, 0, 0 | |
nonaligned = {} | |
out_tree = None | |
if target_path and os.path.exists(target_path): | |
try: | |
out_tree = etree.parse(target_path) | |
last_idx = get_last_page_index(target_path) | |
pages_to_process = list(pages.items())[last_idx+1:] | |
num_aligned = out_tree.xpath("count(.//span[@class='ocr_line'])") | |
except: | |
logger.warn("Could not parse {}".format(target_path)) | |
if out_tree is None: | |
num_aligned = 0 | |
out_tree = etree.fromstring(""" | |
<!doctype html> | |
<html> | |
<head> | |
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> | |
<meta name="ocr-system" content="dta_aligner" /> | |
<meta name="ocr-capabilities" content="ocr_page ocr_line" /> | |
</head> | |
<body/> | |
</html> | |
""", HTML_PARSER) | |
pages_to_process = list(pages.items()) | |
if not pages_to_process: | |
return None, None, None | |
click.echo("Aligning {}".format(dta_id)) | |
body_elem = out_tree.find(".//body") | |
with click.progressbar(pages_to_process) as ps: | |
for page_id, page_lines in ps: | |
logger.debug("Aligning page {} from {}".format(page_id, dta_id)) | |
img_path = None | |
img = None | |
if image_dir: | |
img_path = os.path.abspath(os.path.join( | |
image_dir, "{}.jpg".format(page_id))) | |
if os.path.exists(img_path): | |
try: | |
img = Image.open(img_path) | |
except: | |
logger.warn("Could not read image from {}" | |
.format(img_path)) | |
if img is None: | |
logger.debug("Downloading page {} from {}" | |
.format(page_id, dta_id)) | |
try: | |
img = get_text_image(dta_id, page_id) | |
except Exception as e: | |
logger.exception(e) | |
logger.error("Could not retrieve image {} for {}, " | |
"skipping.".format(page_id, dta_id)) | |
continue | |
if img_path: | |
img.save(img_path) | |
logger.debug("Saved page {} from {} to {}" | |
.format(page_id, dta_id, img_path)) | |
if not page_lines: | |
logger.debug("No text on page, writing empty page.") | |
page_elem = etree.fromstring( | |
'<div class="ocr_page" title="bbox 0 0 {} {}; image {}" />' | |
.format(img.width, img.height, img_path or "")) | |
body_elem.append(page_elem) | |
continue | |
try: | |
logger.debug("Recognizing {}".format(img_path)) | |
ocr_text, hocr = get_prediction(img, img_path) | |
except Exception as e: | |
logger.exception(e) | |
logger.error("Could not recognize {}".format(img_path)) | |
continue | |
aligned = [] | |
align_idx = 0 | |
nonaligned[page_id] = [] | |
for truth_line in page_lines: | |
best_error = 1.0 | |
best_align = 0 | |
for idx, ocr_line in enumerate(ocr_text[align_idx+1:], | |
align_idx+1): | |
total_error = levenshtein(truth_line, ocr_line) | |
error = total_error / len(truth_line) | |
if error < best_error: | |
best_error = error | |
best_align = idx | |
if best_error < 0.5: | |
num_aligned += 1 | |
align_idx = best_align | |
aligned.append((align_idx, truth_line)) | |
logger.debug( | |
u"Matched `{}` <-> `{}` ({})".format( | |
truth_line, ocr_text[best_align], best_error)) | |
else: | |
nonaligned[page_id].append(truth_line) | |
if best_align < len(ocr_text): | |
logger.debug( | |
u"Could not find match for '{}', closest candidate " | |
u"was '{}' with an error of {}" | |
.format(truth_line, ocr_text[best_align], best_error)) | |
page_elem = get_aligned_hocr(aligned, hocr) | |
page_elem.set( | |
'title', | |
page_elem.get('title').replace( | |
'bbox 0 0 0 0,', | |
'bbox 0 0 {} {};'.format(img.width, img.height))) | |
body_elem.append(page_elem) | |
with open(target_path or "/tmp/debug.html", "w") as fp: | |
fp.write(etree.tostring(out_tree, pretty_print=True)) | |
logger.debug("Recognition of {} finished".format(dta_id)) | |
return out_tree, num_aligned, nonaligned | |
def get_dta_ids(): | |
sickle = Sickle(OAI_ENDPOINT) | |
records = sickle.ListRecords(metadataPrefix="cmdi", set="dta") | |
for rec in records: | |
if rec.deleted: | |
continue | |
match = ID_PAT.match(rec.header.identifier) | |
if match: | |
yield match.group(1) | |
@click.command() | |
@click.argument("dta_ids", type=click.Path(dir_okay=False), required=False) | |
@click.option("--output-directory", "-o", default="./", | |
type=click.Path(file_okay=False, writable=True)) | |
def cli(dta_ids, output_directory): | |
if not dta_ids: | |
dta_ids = list(get_dta_ids())[::-1] | |
else: | |
with open(dta_ids) as fp: | |
dta_ids = [l.strip() for l in fp] | |
if not os.path.exists(output_directory): | |
os.makedirs(output_directory) | |
for dta_id in dta_ids: | |
hocr_path = os.path.join(output_directory, "{}.html".format(dta_id)) | |
volume_dir = os.path.join(output_directory, dta_id) | |
hocr_tree, num_aligned, nonaligned = align_dta(dta_id, volume_dir, | |
hocr_path) | |
if hocr_tree is None: | |
continue | |
total_num = num_aligned + sum(len(x) for x in nonaligned.values()) | |
if total_num: | |
aligned_frac = num_aligned / total_num | |
else: | |
aligned_frac = 0 | |
click.echo("Was able to align {} of {} lines (= {:.2f}%)" | |
.format(num_aligned, total_num, 100*aligned_frac)) | |
if aligned_frac < 0.05: | |
click.echo("Too few alignments, please check " | |
"the data manually for mistakes in alignmen.t") | |
logger.warn( | |
"Could align less than 10% of lines in `{}` there likely is " | |
"an error in the mapping between facsimile images and " | |
"transcribed pages, please verify manually.".format(dta_id)) | |
with open(hocr_path, "w") as fp: | |
fp.write(etree.tostring(hocr_tree, pretty_print=True)) | |
if __name__ == '__main__': | |
logging.basicConfig( | |
level=logging.DEBUG, filename="align.log", | |
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s ' | |
'%(message)s') | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment