Created
October 21, 2019 18:17
-
-
Save adelaide01/cea6a3ecaf700cbe9422bed60f59603a to your computer and use it in GitHub Desktop.
batch processing in Python for Abbyy fine reader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Usage: process.py <input file> <output file> [-l <Language>] [-pdf|-txt|-rtf|-docx|-xml] | |
import argparse | |
import os | |
import time | |
from SimpleFolderProcess import * | |
processor = None | |
def setup_processor(): | |
if "ABBYY_APPID" in os.environ: | |
processor.ApplicationId = os.environ["ABBYY_APPID"] | |
if "ABBYY_PWD" in os.environ: | |
processor.Password = os.environ["ABBYY_PWD"] | |
# Proxy settings | |
if "http_proxy" in os.environ: | |
proxy_string = os.environ["http_proxy"] | |
print("Using http proxy at {}".format(proxy_string)) | |
processor.Proxies["http"] = proxy_string | |
if "https_proxy" in os.environ: | |
proxy_string = os.environ["https_proxy"] | |
print("Using https proxy at {}".format(proxy_string)) | |
processor.Proxies["https"] = proxy_string | |
# Recognize a file at filePath and save result to resultFilePath | |
def recognize_file(file_path, result_file_path, language, output_format): | |
print("Uploading..") | |
settings = ProcessingSettings() | |
settings.Language = language | |
settings.OutputFormat = output_format | |
task = processor.process_image(file_path, settings) | |
if task is None: | |
print("Error") | |
return | |
if task.Status == "NotEnoughCredits": | |
print("Not enough credits to process the document. Please add more pages to your application's account.") | |
return | |
print("Id = {}".format(task.Id)) | |
print("Status = {}".format(task.Status)) | |
# Wait for the task to be completed | |
print("Waiting..") | |
# Note: it's recommended that your application waits at least 2 seconds | |
# before making the first getTaskStatus request and also between such requests | |
# for the same task. Making requests more often will not improve your | |
# application performance. | |
# Note: if your application queues several files and waits for them | |
# it's recommended that you use listFinishedTasks instead (which is described | |
# at http://ocrsdk.com/documentation/apireference/listFinishedTasks/). | |
while task.is_active(): | |
time.sleep(5) | |
print(".") | |
task = processor.get_task_status(task) | |
print("Status = {}".format(task.Status)) | |
if task.Status == "Completed": | |
if task.DownloadUrl is not None: | |
processor.download_result(task, result_file_path) | |
print("Result was written to {}".format(result_file_path)) | |
else: | |
print("Error processing task") | |
def create_parser(): | |
parser = argparse.ArgumentParser(description="Recognize a file via web service") | |
parser.add_argument('source_file') | |
parser.add_argument('target_file') | |
parser.add_argument('-l', '--language', default='English', help='Recognition language (default: %(default)s)') | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument('-txt', action='store_const', const='txt', dest='format', default='txt') | |
group.add_argument('-pdf', action='store_const', const='pdfSearchable', dest='format') | |
group.add_argument('-rtf', action='store_const', const='rtf', dest='format') | |
group.add_argument('-docx', action='store_const', const='docx', dest='format') | |
group.add_argument('-xml', action='store_const', const='xml', dest='format') | |
return parser | |
def main(): | |
global processor | |
processor = AbbyyOnlineSdk() | |
setup_processor() | |
args = create_parser().parse_args() | |
source_file = args.source_file | |
target_file = args.target_file | |
language = args.language | |
output_format = args.format | |
if os.path.isfile(source_file): | |
recognize_file(source_file, target_file, language, output_format) | |
else: | |
print("No such file: {}".format(source_file)) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Usage: process.py <input file> <output file> [-l <Language>] [-pdf|-txt|-rtf|-docx|-xml] | |
import shutil | |
import time | |
from os import listdir | |
from os.path import isfile, join | |
import xml.dom.minidom | |
try: | |
import requests | |
except ImportError: | |
print("You need the requests library to be installed in order to use this sample.") | |
print("Run 'pip install requests' to fix it.") | |
exit() | |
class ProcessingSettings: | |
Language = "English" | |
OutputFormat = "txt" | |
class Task: | |
Status = "Unknown" | |
Id = None | |
DownloadUrl = None | |
def is_active(self): | |
if self.Status == "InProgress" or self.Status == "Queued": | |
return True | |
else: | |
return False | |
class AbbyyOnlineSdk: | |
# Warning! This is for easier out-of-the box usage of the sample only. Change to https:// for production use | |
ServerUrl = "http://cloud.ocrsdk.com/" | |
# To create an application and obtain a password, | |
# register at http://cloud.ocrsdk.com/Account/Register | |
# More info on getting your application id and password at | |
# http://ocrsdk.com/documentation/faq/#faq3 | |
ApplicationId = "testing3-2-1" | |
Password = "9BQgDQpuF0MFTMKIvSwVBgsx" | |
Proxies = {} | |
def process_image(self, file_path, settings): | |
url_params = { | |
"language": settings.Language, | |
"exportFormat": settings.OutputFormat | |
} | |
request_url = self.get_request_url("processImage") | |
with open(file_path, 'rb') as image_file: | |
image_data = image_file.read() | |
response = requests.post(request_url, data=image_data, params=url_params, | |
auth=(self.ApplicationId, self.Password), proxies=self.Proxies) | |
# Any response other than HTTP 200 means error - in this case exception will be thrown | |
response.raise_for_status() | |
# parse response xml and extract task ID | |
task = self.decode_response(response.text) | |
return task | |
def get_task_status(self, task): | |
if task.Id.find('00000000-0') != -1: | |
# GUID_NULL is being passed. This may be caused by a logical error in the calling code | |
print("Null task id passed") | |
return None | |
url_params = {"taskId": task.Id} | |
status_url = self.get_request_url("getTaskStatus") | |
response = requests.get(status_url, params=url_params, | |
auth=(self.ApplicationId, self.Password), proxies=self.Proxies) | |
task = self.decode_response(response.text) | |
return task | |
def download_result(self, task, output_path): | |
get_result_url = task.DownloadUrl | |
if get_result_url is None: | |
print("No download URL found") | |
return | |
file_response = requests.get(get_result_url, stream=True, proxies=self.Proxies) | |
with open(output_path, 'wb') as output_file: | |
shutil.copyfileobj(file_response.raw, output_file) | |
def decode_response(self, xml_response): | |
""" Decode xml response of the server. Return Task object """ | |
dom = xml.dom.minidom.parseString(xml_response) | |
task_node = dom.getElementsByTagName("task")[0] | |
task = Task() | |
task.Id = task_node.getAttribute("id") | |
task.Status = task_node.getAttribute("status") | |
if task.Status == "Completed": | |
task.DownloadUrl = task_node.getAttribute("resultUrl") | |
return task | |
def get_request_url(self, url): | |
return self.ServerUrl.strip('/') + '/' + url.strip('/') | |
def processOneFile(aos_, source_, destination_): | |
print("input: " +source_) | |
print("sending image ...") | |
settings =ProcessingSettings() | |
task =aos_.process_image(source_, settings) | |
result =None | |
status =None | |
while not(status =="Completed"): | |
time.sleep( 5 ) | |
print("checking...") | |
result =aos_.get_task_status( task ) | |
status =result.Status | |
print("task status: " +status) | |
print("save output to: " +destination_) | |
aos_.download_result(result, destination_) | |
def processFiles(aos_, sourceFolder_, destinationFolder_): | |
"processes files from one folder and puts results to the other" | |
sourceFiles =[f for f in listdir( sourceFolder_ ) if isfile( join(sourceFolder_, f) )] | |
for sourceFile in sourceFiles: | |
fullSourcePath =join(sourceFolder_, sourceFile) | |
fullDestinationPath =join(destinationFolder_, sourceFile + "txt") | |
processOneFile(aos_, fullSourcePath, fullDestinationPath) | |
return | |
print("We start") | |
aos =AbbyyOnlineSdk() | |
processFiles(aos, "folder-in", "folder-out") | |
print("Done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment