Created
February 16, 2021 10:02
-
-
Save MaxHalford/465fd9d9fa126b7fcd3b09a4f9af19ee to your computer and use it in GitHub Desktop.
ABBYY synchronous query
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib.parse | |
import requests | |
import time | |
import xml.dom.minidom | |
def get_abbyy_transcription(doc, app, password): | |
proxies = {} | |
url_params = { | |
'language': 'French', | |
'profile': 'textExtraction', | |
'exportFormat': 'txtUnstructured', | |
'txtUnstructured:paragraphAsOneLine': True | |
} | |
server_url = 'https://cloud-eu.ocrsdk.com/v2' | |
request_url = urllib.parse.urljoin(server_url, 'processImage') | |
path = f'/Users/max.halford/alan_data/osteo_invoices/{os.path.basename(doc.doc_uri)}' | |
with open(path, 'rb') as image_file: | |
image_data = image_file.read() | |
import xml.dom.minidom | |
def parse_task_response(response): | |
dom = xml.dom.minidom.parseString(response.text) | |
task_node = dom.getElementsByTagName('task')[0] | |
task = { | |
'taskId': task_node.getAttribute('id'), | |
'status': task_node.getAttribute('status') | |
} | |
if task['status'] == 'Completed': | |
task['downloadUrl'] = task_node.getAttribute('resultUrl') | |
return task | |
response = requests.post( | |
request_url, | |
data=image_data, | |
params=url_params, | |
auth=(app, password), | |
proxies=proxies | |
) | |
response.raise_for_status() | |
task = parse_task_response(response) | |
while True: | |
url_params = {'taskId': task['taskId']} | |
status_url = urllib.parse.urljoin(server_url, 'getTaskStatus') | |
response = requests.get( | |
status_url, | |
params=url_params, | |
auth=(app, password) | |
) | |
task = parse_task_response(response) | |
if task['status'] == 'Completed': | |
break | |
time.sleep(1) | |
file_response = requests.get(task['downloadUrl'], stream=True, proxies=proxies) | |
buffer = io.BytesIO() | |
shutil.copyfileobj(file_response.raw, buffer) | |
text = buffer.getvalue().decode('utf-8') | |
words = [w for w in text.splitlines() if w] | |
return words |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment