Skip to content

Instantly share code, notes, and snippets.

@jbaiter
Last active September 28, 2017 13:51
Show Gist options
  • Save jbaiter/83057595be6c10655ea68713fcdb47da to your computer and use it in GitHub Desktop.
Save jbaiter/83057595be6c10655ea68713fcdb47da to your computer and use it in GitHub Desktop.
Dependencies: `pip install click requests editdistance kraken lxml pillow-simd sickle`
from __future__ import division
import json
import logging
import os
import re
from collections import OrderedDict
from io import BytesIO
import click
import requests
from editdistance import eval as levenshtein
from kraken import binarization, pageseg, rpred, serialization
from kraken.lib import models
from lxml import etree
from PIL import Image
from sickle import Sickle
NSMAP = {'dcorp': 'http://www.dspin.de/data/textcorpus',
'cmd': 'http://www.clarin.eu/cmd/',
'oai': 'http://www.openarchives.org/OAI/2.0/',
'tei': 'http://www.tei-c.org/ns/1.0'}
OAI_ENDPOINT = "http://fedora.dwds.de/oai-dta/"
HTML_PARSER = etree.HTMLParser()
MODEL = models.load_any(os.path.expanduser('~/.config/kraken/fraktur.pronn'))
TEXT_URL = "http://www.deutschestextarchiv.de/book/download_txt/{dta_id}"
IMG_URL = ("http://media.dwds.de"
"/dta/images/{dta_id}/{dta_id}_{page_id}_1600px.jpg")
PAGE_SPLIT_PAT = re.compile(r'\x0c\n(?=\[.*?\]\n)')
PAGE_ID_PAT = re.compile(r'\[(?:.*?/)?(\d{4})]')
ID_PAT = re.compile(r'^oai:dta:(.*)$')
logger = logging.getLogger()
def get_last_page_index(hocr_path):
try:
tree = etree.parse(hocr_path, HTML_PARSER)
return list(enumerate(
tree.findall(".//div[@class='ocr_page']")))[-1][0]
except:
return -1
def get_text_pages(dta_id):
with open("/mnt/data/datasets/dta/txt/{}.txt".format(dta_id)) as fp:
text = fp.read().decode('utf8')
pages = OrderedDict()
for page in PAGE_SPLIT_PAT.split(text):
lines = [l.strip() for l in page.split('\n') if l.strip() != '']
match = PAGE_ID_PAT.match(lines[0])
if match:
page_id = match.group(1)
else:
page_id = unicode(int(pages.keys()[-1])+1) if pages else 0
if not all(l.startswith('[') and l.endswith(']') for l in lines):
pages[page_id] = lines[1:]
return pages
def get_text_image(dta_id, page_id):
attempt = 0
while True:
try:
resp = requests.get(IMG_URL.format(dta_id=dta_id, page_id=page_id),
timeout=30)
except Exception as e:
if attempt == 2:
raise e
else:
attempt += 1
continue
return Image.open(BytesIO(resp.content))
def get_prediction(img, img_path=None):
logger.debug("Binarizing")
binarized = binarization.nlbin(img, 0.5, 0.5, 1.0, 0.1, 80, 20, 5, 90)
logger.debug("Segmenting")
boxes = pageseg.segment(binarized, None, False)
logger.debug("Predicting")
preds = [pred for pred in rpred.rpred(MODEL, binarized, boxes, 16)
if pred.cuts]
return ([p.prediction for p in preds],
serialization.serialize(preds, img_path or "",
image_size=img.size))
def get_aligned_hocr(aligned_lines, hocr):
tree = etree.fromstring(hocr, HTML_PARSER)
for line_idx, line_text in aligned_lines:
line_elem = tree.find(".//span[@class='ocr_line'][{0}]"
.format(line_idx+1))
# Remove child (= ocr_wordx) elements, since we don't align the
# individual words
for child in line_elem.getchildren():
line_elem.remove(child)
line_elem.text = line_text
# Remove `cuts` information from title
line_elem.set('title', line_elem.get('title').split(';')[0])
non_aligned_lines = tree.xpath(
".//span[@class='ocr_line'][./span[@class='ocrx_word']]")
for line_elem in non_aligned_lines:
line_elem.set('class', 'ocr_line not_aligned')
return tree.find(".//div[@class='ocr_page']")
def align_dta(dta_id, image_dir=None, target_path=None):
if not os.path.exists(image_dir):
os.makedirs(image_dir)
pages = get_text_pages(dta_id)
num_too_long = sum(1 for p in pages.values()
if any(len(l) >= 150 for l in p))
too_long_ratio = (num_too_long / len(pages))
if too_long_ratio >= 0.25:
logger.warn("{:.2f}% of the pages in {} have lines with more than 150 "
"characters, likely the transcription did not preserve "
"line-boundaries.".format(100*too_long_ratio, dta_id))
return None, 0, 0
nonaligned = {}
out_tree = None
if target_path and os.path.exists(target_path):
try:
out_tree = etree.parse(target_path)
last_idx = get_last_page_index(target_path)
pages_to_process = list(pages.items())[last_idx+1:]
num_aligned = out_tree.xpath("count(.//span[@class='ocr_line'])")
except:
logger.warn("Could not parse {}".format(target_path))
if out_tree is None:
num_aligned = 0
out_tree = etree.fromstring("""
<!doctype html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<meta name="ocr-system" content="dta_aligner" />
<meta name="ocr-capabilities" content="ocr_page ocr_line" />
</head>
<body/>
</html>
""", HTML_PARSER)
pages_to_process = list(pages.items())
if not pages_to_process:
return None, None, None
click.echo("Aligning {}".format(dta_id))
body_elem = out_tree.find(".//body")
with click.progressbar(pages_to_process) as ps:
for page_id, page_lines in ps:
logger.debug("Aligning page {} from {}".format(page_id, dta_id))
img_path = None
img = None
if image_dir:
img_path = os.path.abspath(os.path.join(
image_dir, "{}.jpg".format(page_id)))
if os.path.exists(img_path):
try:
img = Image.open(img_path)
except:
logger.warn("Could not read image from {}"
.format(img_path))
if img is None:
logger.debug("Downloading page {} from {}"
.format(page_id, dta_id))
try:
img = get_text_image(dta_id, page_id)
except Exception as e:
logger.exception(e)
logger.error("Could not retrieve image {} for {}, "
"skipping.".format(page_id, dta_id))
continue
if img_path:
img.save(img_path)
logger.debug("Saved page {} from {} to {}"
.format(page_id, dta_id, img_path))
if not page_lines:
logger.debug("No text on page, writing empty page.")
page_elem = etree.fromstring(
'<div class="ocr_page" title="bbox 0 0 {} {}; image {}" />'
.format(img.width, img.height, img_path or ""))
body_elem.append(page_elem)
continue
try:
logger.debug("Recognizing {}".format(img_path))
ocr_text, hocr = get_prediction(img, img_path)
except Exception as e:
logger.exception(e)
logger.error("Could not recognize {}".format(img_path))
continue
aligned = []
align_idx = 0
nonaligned[page_id] = []
for truth_line in page_lines:
best_error = 1.0
best_align = 0
for idx, ocr_line in enumerate(ocr_text[align_idx+1:],
align_idx+1):
total_error = levenshtein(truth_line, ocr_line)
error = total_error / len(truth_line)
if error < best_error:
best_error = error
best_align = idx
if best_error < 0.5:
num_aligned += 1
align_idx = best_align
aligned.append((align_idx, truth_line))
logger.debug(
u"Matched `{}` <-> `{}` ({})".format(
truth_line, ocr_text[best_align], best_error))
else:
nonaligned[page_id].append(truth_line)
if best_align < len(ocr_text):
logger.debug(
u"Could not find match for '{}', closest candidate "
u"was '{}' with an error of {}"
.format(truth_line, ocr_text[best_align], best_error))
page_elem = get_aligned_hocr(aligned, hocr)
page_elem.set(
'title',
page_elem.get('title').replace(
'bbox 0 0 0 0,',
'bbox 0 0 {} {};'.format(img.width, img.height)))
body_elem.append(page_elem)
with open(target_path or "/tmp/debug.html", "w") as fp:
fp.write(etree.tostring(out_tree, pretty_print=True))
logger.debug("Recognition of {} finished".format(dta_id))
return out_tree, num_aligned, nonaligned
def get_dta_ids():
sickle = Sickle(OAI_ENDPOINT)
records = sickle.ListRecords(metadataPrefix="cmdi", set="dta")
for rec in records:
if rec.deleted:
continue
match = ID_PAT.match(rec.header.identifier)
if match:
yield match.group(1)
@click.command()
@click.argument("dta_ids", type=click.Path(dir_okay=False), required=False)
@click.option("--output-directory", "-o", default="./",
type=click.Path(file_okay=False, writable=True))
def cli(dta_ids, output_directory):
if not dta_ids:
dta_ids = list(get_dta_ids())[::-1]
else:
with open(dta_ids) as fp:
dta_ids = [l.strip() for l in fp]
if not os.path.exists(output_directory):
os.makedirs(output_directory)
for dta_id in dta_ids:
hocr_path = os.path.join(output_directory, "{}.html".format(dta_id))
volume_dir = os.path.join(output_directory, dta_id)
hocr_tree, num_aligned, nonaligned = align_dta(dta_id, volume_dir,
hocr_path)
if hocr_tree is None:
continue
total_num = num_aligned + sum(len(x) for x in nonaligned.values())
if total_num:
aligned_frac = num_aligned / total_num
else:
aligned_frac = 0
click.echo("Was able to align {} of {} lines (= {:.2f}%)"
.format(num_aligned, total_num, 100*aligned_frac))
if aligned_frac < 0.05:
click.echo("Too few alignments, please check "
"the data manually for mistakes in alignmen.t")
logger.warn(
"Could align less than 10% of lines in `{}` there likely is "
"an error in the mapping between facsimile images and "
"transcribed pages, please verify manually.".format(dta_id))
with open(hocr_path, "w") as fp:
fp.write(etree.tostring(hocr_tree, pretty_print=True))
if __name__ == '__main__':
logging.basicConfig(
level=logging.DEBUG, filename="align.log",
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s '
'%(message)s')
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment