Last active
December 3, 2020 11:43
-
-
Save dmgig/6de6eed782ea527abfd1cd715f99ec5f to your computer and use it in GitHub Desktop.
Multithreaded OCR Process with Tesseract, TEXTCLEANER, and imagemagick
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import os | |
import sys | |
import getopt | |
import subprocess | |
import time | |
import pytesseract | |
import argparse | |
import cv2 | |
import shutil | |
import logging | |
import glob | |
import logging | |
import math | |
import calendar | |
from subprocess import Popen | |
from PIL import Image | |
# CONF | |
TESSCHUNK = 6 | |
TEXTCHUNK = 3 | |
CONVCHUNK = 3 | |
LOGSDIR = '_logs' | |
IMGSDIR = '_imgs' | |
TEXTDIR = '_text' | |
DONEDIR = '_done' | |
documentCounter = 0; | |
timeStart = calendar.timegm(time.gmtime()); | |
if not os.path.exists(DONEDIR): | |
os.mkdir(DONEDIR) | |
if not os.path.exists(LOGSDIR): | |
os.mkdir(LOGSDIR) | |
logging.basicConfig(filename=LOGSDIR+'/error.log',level=logging.DEBUG) | |
# FUNCS | |
# time conv https://stackoverflow.com/questions/4048651/python-function-to-convert-seconds-into-minutes-hours-and-days/4048773 | |
def dhmsTime(q): | |
days = divmod(q, 86400) | |
# days[0] = whole days and | |
# days[1] = seconds remaining after those days | |
hours = divmod(days[1], 3600) | |
minutes = divmod(hours[1], 60) | |
return "%i days, %i hours, %i minutes, %i seconds" % (days[0], hours[0], minutes[0], minutes[1]) | |
# https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks | |
def chunks(l, n): | |
"""Yield successive n-sized chunks from l.""" | |
for i in range(0, len(l), n): | |
yield l[i:i + n] | |
# loop through files in dir | |
def listFiles(dir): | |
basedir = dir | |
print(PATH_DOCS+'/*.pdf') | |
for pdf in glob.glob(PATH_DOCS+'/*.pdf'): | |
if os.path.isfile(pdf): | |
print('file item=',pdf) | |
def prepFileDirs(dir,type): | |
basedir = dir | |
for file in glob.glob(PATH_DOCS+'/*.'+type): | |
if os.path.isfile(file): | |
fileName = os.path.basename(file) | |
dirName = os.path.dirname(file) | |
dirMain = dirName | |
dirImgs = dirMain+'/'+IMGSDIR | |
dirText = dirMain+'/'+TEXTDIR | |
dirDone = dirMain+'/'+DONEDIR | |
print(dirName,' ',dirMain,' ',dirImgs,' ',dirText) | |
if not os.path.exists(dirImgs): | |
os.mkdir(dirImgs) | |
if not os.path.exists(dirText): | |
os.mkdir(dirText) | |
if type == 'pdf': | |
prepPdf(file, dirName, dirMain, dirImgs, dirText, dirDone) | |
shutil.rmtree(dirImgs) | |
elif type == 'PNG': | |
prepPng(file, dirName, dirMain, dirImgs, dirText) | |
def appendToFilename(filePath, append): | |
dirName = os.path.dirname(filePath)+'/' | |
baseName = os.path.basename(filePath) | |
appendedName = dirName + os.path.splitext(baseName)[0] + append + os.path.splitext(baseName)[1] | |
return appendedName | |
def prepPdf(pdfPath, dirName, dirMain, imgPath, textPath, dirDone): | |
global documentCounter | |
global timeStart | |
timeStartDoc = calendar.timegm(time.gmtime()); | |
print("\n") | |
print(pdfPath) | |
print(os.path.splitext(os.path.basename(pdfPath))[0]) | |
print("\n\n") | |
# convert pdf to png | |
subprocess.call(['gs', '-dNOPAUSE', '-dBATCH', '-dTextAlphaBits=4', '-dGraphicsAlphaBits=4', '-sDEVICE=pnggray', '-r300x300', '-sCompression=none', '-sOutputFile='+imgPath+'/'+os.path.splitext(os.path.basename(pdfPath))[0]+'_%d.png', pdfPath ]) | |
imgFiles = glob.glob(imgPath+'/*.png') | |
imgFilesCnt = len(imgFiles) | |
# remove gray backgrounds | |
imgFilesChunks = chunks(imgFiles, CONVCHUNK) | |
i=0 | |
chunksSize = math.ceil(imgFilesCnt / CONVCHUNK); | |
for chunk in imgFilesChunks: | |
print("Chunk %d of %d" % (i+1,chunksSize)) | |
ps = {} | |
for file in chunk: | |
baseName = os.path.basename(file) | |
p = subprocess.Popen(['convert', file, '-set', 'colorspace', 'gray', '-contrast-stretch', '4x80%', file], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
ps[p.pid] = p | |
print("Waiting for %d convert processes..." % len(ps)) | |
while ps: | |
pid, status = os.wait() | |
if pid in ps: | |
del ps[pid] | |
print("Waiting for %d convert processes..." % len(ps)) | |
i=i+1 | |
# image enhancement/restoration | |
imgFiles = glob.glob(imgPath+'/*.png') | |
imgFilesChunks = chunks(imgFiles, TEXTCHUNK) | |
i=0 | |
chunksSize = math.ceil(imgFilesCnt / TEXTCHUNK) | |
for chunk in imgFilesChunks: | |
print("Chunk %d of %d" % (i+1,chunksSize)) | |
ps = {} | |
for file in chunk: | |
baseName = os.path.basename(file) | |
p = subprocess.Popen(['textcleaner', '-u', '-T', '-p', '20', file, file], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
ps[p.pid] = p | |
print("Waiting for %d textcleaner processes..." % len(ps)) | |
while ps: | |
pid, status = os.wait() | |
if pid in ps: | |
del ps[pid] | |
print("Waiting for %d textcleaner processes..." % len(ps)) | |
i=i+1 | |
# tesseract (3 processes) | |
# https://stackoverflow.com/questions/3194018/wait-the-end-of-subprocesses-with-multiple-parallel-jobs | |
imgFiles = glob.glob(imgPath+'/*.png') | |
imgFilesChunks = chunks(imgFiles, TESSCHUNK) | |
i=0 | |
chunksSize = math.ceil(imgFilesCnt / TESSCHUNK) | |
for chunk in imgFilesChunks: | |
print("Chunk %d of %d" % (i+1,chunksSize)) | |
ps = {} | |
for file in chunk: | |
baseName = os.path.basename(file) | |
p = subprocess.Popen(['tesseract', file, textPath+'/'+os.path.splitext(baseName)[0], '-v', '-l', 'eng'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
ps[p.pid] = p | |
print("Waiting for %d tesseract processes..." % len(ps)) | |
while ps: | |
pid, status = os.wait() | |
if pid in ps: | |
del ps[pid] | |
print("Waiting for %d tesseract processes..." % len(ps)) | |
i=i+1 | |
# move finished pdf | |
shutil.move(pdfPath, dirDone+'/'); | |
documentCounter = documentCounter + 1; | |
curTime = calendar.timegm(time.gmtime()); | |
print(curTime) | |
runningTimeString = dhmsTime(curTime - timeStart) | |
print(runningTimeString) | |
documentTimeString = dhmsTime(curTime - timeStartDoc) | |
print(documentTimeString) | |
print("\n\nCompleted %d documents in %s" % (documentCounter, runningTimeString)) | |
print("Last doc took %s\n\n" % documentTimeString) | |
# WORK | |
PATH_DOCSO = '.' | |
PATH_DOCS = '.' | |
listFiles(PATH_DOCS) | |
prepFileDirs(PATH_DOCS,'pdf') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment