Created
December 13, 2020 11:50
-
-
Save dynamicguy/2248c08b81f1477c46af8a47a87ed866 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
# !/usr/bin/env python | |
from __future__ import absolute_import, unicode_literals | |
# from tesserocr import PyTessBaseAPI | |
from PIL import Image | |
import hashlib | |
import os | |
import datetime as dt | |
from django.conf import settings | |
import cv2 as cv | |
import numpy as np | |
from celery import shared_task | |
from model_utils.models import now | |
import imutils | |
from nid.users.models import User | |
from .common import auto_crop, get_grayscale, remove_noise, deskew_img | |
from .models import Output, Document, Credit | |
import pytesseract | |
config = "--oem 1 --psm 6 -c tessedit_char_blacklist=|" | |
@shared_task | |
def extract_text(document_id, username, lang="ben"): | |
print("received task for: %s" % document_id) | |
document = Document.objects.get(id=document_id) | |
user = User.objects.get(username=username) | |
if not can_extract(user): | |
output = Output.objects.create( | |
document=document, | |
pub_date=now, | |
user=user, | |
result="Insufficient funds. Please recharge your account.", | |
) | |
return output.result | |
print("processing task for: %s" % document.doc.path) | |
# local_filename = get_img_from_s3(img_file_path) | |
# local_filename = get_img_from_s3(img_file_path) | |
result = dict() | |
try: | |
output = [] | |
text = '' | |
img = Image.open(document.doc.path).convert('LA') | |
output = pytesseract.image_to_string(img, lang=lang, config=config) | |
# with PyTessBaseAPI(lang=lang) as api: | |
# # api.SetVariable("save_blob_choices", "T") | |
# # pil_image = Image.fromarray(img) | |
# # pil_image.filter(ImageFilter.SHARPEN) | |
# api.SetImage(img) | |
# | |
# text = api.GetUTF8Text() | |
# result['text'] = text | |
# result_txt = text.split('\n') | |
# output = [i for i in result_txt if i and len(i) > 2] | |
# result['parsed'] = output | |
print(len(output), output) | |
if output[-1]: result['id_no'] = output[-1].split(':')[-1].strip() if output[-1].find(':') else output[-1] | |
if output[-2]: result['dob'] = output[-2].split(':')[-1].strip() if output[-2].find(':') else output[-2] | |
if output[-3]: result['bn_mother'] = output[-3].split(':')[-1].strip() if output[-3].find(':') else output[-3] | |
if output[-4]: result['bn_father'] = output[-4].split(':')[-1].strip() if output[-4].find(':') else output[-4] | |
if output[-5]: result['en_name'] = output[-5].split(':')[-1].strip() if output[-5].find(':') else output[-5] | |
if output[-6]: result['bn_name'] = output[-6].split(':')[-1].strip() if output[-6].find(':') else output[-6] | |
out = Output.objects.create( | |
document=document, pub_date=now, user=user, result=text | |
) | |
charge(user) | |
except Exception as err: | |
# print("OCR error: {0}".format(err)) | |
raise | |
finally: | |
return result | |
@shared_task | |
def extract_pre_processed_text( | |
document_id, pre_processed_file_path, username, lang="ben" | |
): | |
document = Document.objects.get(id=document_id) | |
user = User.objects.get(username=username) | |
if not can_extract(user): | |
output = Output.objects.create( | |
document=document, | |
pub_date=now, | |
user=user, | |
result="Insufficient funds. Please recharge your account.", | |
) | |
return output.result | |
full_path = os.path.join(settings.MEDIA_ROOT, pre_processed_file_path) | |
print("processing task for: %s" % full_path) | |
# local_filename = get_img_from_s3(img_file_path) | |
# local_filename = get_img_from_s3(img_file_path) | |
result = dict() | |
try: | |
output = [] | |
text = '' | |
# img = cv.imread(full_path, 0) | |
img = Image.open(full_path).convert('LA') | |
output = pytesseract.image_to_string(img, lang=lang, config=config) | |
# with PyTessBaseAPI(lang=lang) as api: | |
# # api.SetVariable("save_blob_choices", "T") | |
# # pil_image = Image.fromarray(img) | |
# # pil_image.filter(ImageFilter.SHARPEN) | |
# api.SetImage(img) | |
# | |
# text = api.GetUTF8Text() | |
# result['text'] = text | |
# result_txt = text.split('\n') | |
# output = [i for i in result_txt if i and len(i) > 2] | |
# result['parsed'] = output | |
print(len(output), output) | |
if output[-1]: result['id_no'] = output[-1].split(':')[-1].strip() if output[-1].find(':') else output[-1] | |
if output[-2]: result['dob'] = output[-2].split(':')[-1].strip() if output[-2].find(':') else output[-2] | |
if output[-3]: result['bn_mother'] = output[-3].split(':')[-1].strip() if output[-3].find(':') else output[-3] | |
if output[-4]: result['bn_father'] = output[-4].split(':')[-1].strip() if output[-4].find(':') else output[-4] | |
if output[-5]: result['en_name'] = output[-5].split(':')[-1].strip() if output[-5].find(':') else output[-5] | |
if output[-6]: result['bn_name'] = output[-6].split(':')[-1].strip() if output[-6].find(':') else output[-6] | |
out = Output.objects.create( | |
document=document, pub_date=now, user=user, result=text | |
) | |
charge(user) | |
except Exception as er: | |
# print("OCR error: {0}".format(err)) | |
raise | |
finally: | |
return result | |
def pre_process(doc, bw, threshold, blur, enhance, reset, autocrop, deskew): | |
if reset == "true": | |
return doc.name | |
image_path = doc.path | |
img = cv.imread(image_path) | |
img = imutils.resize(img, height=500) | |
if deskew == "true": | |
gray = get_grayscale(img) | |
img = deskew_img(gray) | |
if autocrop == "true": | |
gray = get_grayscale(img) | |
kernel = np.ones((5, 5), np.uint8) | |
erosion = cv.erode(gray, kernel, iterations=1) | |
dilation = cv.dilate(erosion, kernel, iterations=1) | |
edges = cv.Canny(dilation, 100, 300, apertureSize=3) | |
img = auto_crop(edges, img) | |
if enhance == "true": | |
# Apply dilation and erosion to remove some noise | |
kernel = np.ones((1, 1), np.uint8) | |
img = cv.dilate(img, kernel, iterations=1) | |
img = cv.erode(img, kernel, iterations=1) | |
if bw == "true": | |
# Convert to gray | |
img = cv.cvtColor(img, cv.COLOR_BGR2GRAY) | |
img = cv.cvtColor(img, cv.COLOR_GRAY2BGR) | |
if threshold == "true": | |
img = cv.cvtColor(img, cv.COLOR_BGR2GRAY) | |
img = apply_threshold(img, 3) | |
if blur == "true": | |
img = remove_noise(img) | |
now = dt.datetime.now().isoformat() | |
filename, ext = os.path.splitext(image_path) | |
local_filename = filename + "_processed_" + now + ext | |
filename, ext = os.path.splitext(doc.name) | |
file_path = filename + "_processed_" + now + ext | |
# with open(local_filename, "rb") as data: | |
# s3.upload_fileobj( | |
# data, "nid", "media/" + file_path, {"ACL": "public-read"} | |
# ) | |
try: | |
cv.imwrite(local_filename, img) | |
except Exception as err: | |
raise | |
finally: | |
return file_path | |
def apply_threshold(img, argument): | |
switcher = { | |
1: cv.threshold( | |
cv.GaussianBlur(img, (9, 9), 0), | |
0, | |
255, | |
cv.THRESH_BINARY + cv.THRESH_OTSU, | |
)[1], | |
2: cv.threshold( | |
cv.GaussianBlur(img, (7, 7), 0), | |
0, | |
255, | |
cv.THRESH_BINARY + cv.THRESH_OTSU, | |
)[1], | |
3: cv.threshold( | |
cv.GaussianBlur(img, (5, 5), 0), | |
0, | |
255, | |
cv.THRESH_BINARY + cv.THRESH_OTSU, | |
)[1], | |
4: cv.threshold( | |
cv.medianBlur(img, 5), 0, 255, cv.THRESH_BINARY + cv.THRESH_OTSU | |
)[1], | |
5: cv.threshold( | |
cv.medianBlur(img, 3), 0, 255, cv.THRESH_BINARY + cv.THRESH_OTSU | |
)[1], | |
6: cv.adaptiveThreshold( | |
cv.GaussianBlur(img, (5, 5), 0), | |
255, | |
cv.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv.THRESH_BINARY, | |
31, | |
2, | |
), | |
7: cv.adaptiveThreshold( | |
cv.medianBlur(img, 3), | |
255, | |
cv.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv.THRESH_BINARY, | |
31, | |
2, | |
), | |
} | |
return switcher.get(argument, "Invalid method") | |
def compute_md5_hash(my_string): | |
m = hashlib.md5() | |
m.update(my_string.encode("utf-8")) | |
return m.hexdigest() | |
def charge(user): | |
credit = Credit.objects.get(user=user) | |
credit.balance = credit.balance - 10 | |
credit.save() | |
def update_document(document): | |
record = Document.objects.get(document.id) | |
record.doc.path = document.doc.path | |
record.save() | |
def can_extract(user): | |
credit = Credit.objects.get(user=user) | |
return credit.balance > 9 | |
def crop_image(doc, x=0, y=0, w=0, h=0, cw=0, ch=0): | |
image_path = doc.path | |
img = cv.imread(image_path) | |
cropped_img = imcrop(img, x, y, w, h, cw, ch) | |
now = dt.datetime.now().isoformat() | |
filename, ext = os.path.splitext(image_path) | |
local_filename = filename + "_processed_" + now + ext | |
cv.imwrite(local_filename, cropped_img) | |
filename, ext = os.path.splitext(doc.name) | |
file_path = filename + "_processed_" + now + ext | |
# with open(local_filename, "rb") as data: | |
# s3.upload_fileobj( | |
# data, "nid", "media/" + file_path, {"ACL": "public-read"} | |
# ) | |
return file_path | |
def imcrop(img, x, y, w, h, cw, ch): | |
cw = int(cw) | |
ch = int(ch) | |
wp = img.shape[1] / cw | |
hp = img.shape[0] / ch | |
x = int(int(x) * wp) | |
y = int(int(y) * hp) | |
w = int(int(w) * wp) | |
h = int(int(h) * hp) | |
if x < 0 or y < 0 or w > img.shape[1] or h > img.shape[0]: | |
img, x, y, w, h = pad_img_to_fit_bbox(img, x, y, w, h) | |
return img[y: y + h, x: x + w, :] | |
def pad_img_to_fit_bbox(img, x1, x2, y1, y2): | |
img = cv.copyMakeBorder( | |
img, | |
-min(0, y1), | |
max(y2 - img.shape[0], 0), | |
-min(0, x1), | |
max(x2 - img.shape[1], 0), | |
cv.BORDER_REPLICATE, | |
) | |
y2 += -min(0, y1) | |
y1 += -min(0, y1) | |
x2 += -min(0, x1) | |
x1 += -min(0, x1) | |
return img, x1, x2, y1, y2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment