Last active
January 13, 2020 07:13
-
-
Save Jongbhin/d9b8e4bdec71e51d6675121d24f59697 to your computer and use it in GitHub Desktop.
pil and csv image load form url, file or data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
reload(sys) | |
sys.setdefaultencoding('utf-8') | |
import datetime | |
import PIL | |
from PIL import Image | |
from exifutil import exifutil | |
from korean_url_handler import korean_url_handler | |
import werkzeug | |
import collections | |
import urllib2 | |
import cv2 | |
import numpy as np | |
import contextlib | |
import requests | |
from io import BytesIO | |
from tornado.log import access_log, app_log, gen_log | |
class daemon_utils(): | |
def __init__(self, upload_path=None): | |
self.UPLOAD_FOLDER = "../temp_img" | |
if upload_path != None: | |
self.UPLOAD_FOLDER = upload_path | |
print('upload folder:', self.UPLOAD_FOLDER) | |
self.exifutils = exifutil() | |
self.korean_url_handler = korean_url_handler() | |
def gif_to_jpg(self, infile): | |
try: | |
save_path = infile.replace('.gif', '.jpg') | |
im = Image.open(infile) | |
rgb_im = im.convert('RGB') | |
rgb_im.save(save_path) | |
except IOError as e: | |
print "invalid image file :", infile | |
raise e | |
def convert(self, data): | |
if isinstance(data, basestring): | |
return data.encode("utf-8") | |
elif isinstance(data, collections.Mapping): | |
return dict(map(self.convert, data.iteritems())) | |
elif isinstance(data, collections.Iterable): | |
return type(data)(map(self.convert, data)) | |
else: | |
return data | |
def get_directory(self, dir_path): | |
app_log.debug("mkdir : " + dir_path) | |
if not os.path.exists(dir_path): | |
os.makedirs(dir_path) | |
return dir_path | |
def get_downloaded_directory_path(self, file_path, date_time): | |
date_list = [date_time.year, date_time.month, date_time.day] | |
for token in date_list: | |
file_path = self.get_directory(os.path.join(file_path, str(token))) | |
return file_path | |
def download_image(self, image_url, img_ext, file_path='tmp'): | |
try: | |
with contextlib.closing(urllib2.urlopen(image_url, timeout=7)) as request: | |
# request = urllib2.urlopen(image_url, timeout=7) | |
databuf = request.read() | |
# img_array = np.asarray(bytearray(databuf), dtype=np.uint8) | |
filename = self.korean_url_handler.get_downloaded_filename(file_path, img_ext) | |
with open(filename, 'wb') as img_fp: | |
img_fp.write(databuf) | |
img_fp.flush() | |
return filename; | |
except Exception as err: | |
# logging.info('URL Image download error: %s', err) | |
raise err | |
def file_upload(self, uploaded_file): | |
filepath = self.get_downloaded_directory_path(self.UPLOAD_FOLDER, datetime.datetime.now()) | |
fname = str(datetime.datetime.now()).replace(' ', '_') + "." + werkzeug.secure_filename(uploaded_file.filename.split(".")[-1]) | |
filename = filepath + "/" + fname | |
imagefile = open(filename, 'w') | |
imagefile.write(uploaded_file['body']) | |
app_log.debug("file upload : " + filename) | |
return filename | |
def raw_file_upload(self, raw_file): | |
filepath = self.get_downloaded_directory_path(self.UPLOAD_FOLDER, datetime.datetime.now()) | |
fname = str(datetime.datetime.now()).replace(' ', '_') + "." + werkzeug.secure_filename("jpg") | |
filename = filepath + "/" + fname | |
imagefile = open(filename, 'w') | |
imagefile.write(raw_file) | |
app_log.debug("raw file upload : " + filename) | |
return filename | |
def crop_images_with_roi(self, filename, roi): | |
image = PIL.Image.open(filename).crop(roi.astype(np.int)) | |
# import pdb;pdb.set_trace() | |
crop_path = filename + str(datetime.datetime.now())[-8:] + ".cropped.jpg" | |
image.save(crop_path) | |
return crop_path | |
def image_download(self, image_url): | |
try: | |
if image_url.split('.')[-1] == 'gif': | |
try: | |
file_name = self.download_image(image_url, 'gif') | |
if os.path.exists(file_name) == False: | |
raise Exception('download_image') | |
# NOTE: supports for gif format | |
if file_name.endswith('.gif'): | |
self.gif_to_jpg(file_name) | |
file_name = file_name.replace('.gif', '.jpg') | |
image = cv2.imread(file_name, flags=cv2.IMREAD_COLOR) | |
finally: | |
try: | |
os.remove(file_name) | |
if file_name.endswith('.gif'): | |
os.remove(file_name.replace('.gif', '.jpg')) | |
elif file_name.endswith('.jpg'): | |
os.remove(file_name.replace('.jpg', '.gif')) | |
except OSError: | |
pass | |
else: | |
with contextlib.closing(urllib2.urlopen(image_url, timeout=7)) as request: | |
# request = urllib2.urlopen(image_url, timeout=7) | |
databuf = request.read() | |
img_array = np.asarray(bytearray(databuf), dtype=np.uint8) | |
image = cv2.imdecode(img_array, flags=cv2.IMREAD_COLOR) | |
return image | |
except Exception as e: | |
raise e | |
def image_load_and_resize(self, image_path, image_data=None, resize=True): | |
try: | |
if image_data != None: | |
imagenp = np.fromstring(image_data, dtype=np.uint8) | |
image = cv2.imdecode(imagenp, 1) | |
elif os.path.exists(image_path): | |
image = cv2.imread(image_path, flags=cv2.IMREAD_COLOR) | |
if image is None: | |
image_path = self.gif_to_jpg(image_path) | |
image = cv2.imread(image_path, flags=cv2.IMREAD_COLOR) | |
else: | |
image = self.image_download(image_path) | |
img_size = image.shape | |
if resize: | |
image = cv2.resize(image, (200, 200)) | |
image = image.reshape(1, 200, 200, 3) | |
except Exception as e: | |
app_log.info("image load error : %s" % e.message) | |
if resize: | |
image = np.zeros(shape=[1, 200, 200, 3], dtype=np.uint8) | |
else: | |
image = np.zeros(shape=[1, 224, 224, 3], dtype=np.uint8) | |
img_size = [0, 0] | |
return image, img_size | |
def pil_image_load(self, image_path, image_data=None, resize=True): | |
try: | |
if image_data != None: | |
imagenp = np.fromstring(image_data, dtype=np.uint8) | |
image = Image.fromarray(imagenp.astype('uint8'), 'RGB') | |
elif os.path.exists(image_path): | |
image = Image.open(image_path).convert('RGB') | |
else: | |
response = requests.get(image_path) | |
image = Image.open(BytesIO(response.content)).convert('RGB') | |
img_size = image.size | |
if resize: | |
image = image.resize((224, 224), PIL.Image.ANTIALIAS) | |
except Exception as e: | |
print(e) | |
app_log.info("image load error : %s" % e.message) | |
image = np.zeros(shape=[3, 224, 224], dtype=np.uint8) | |
image = Image.fromarray(image.astype('uint8'), 'RGB') | |
img_size = [0, 0] | |
return image, img_size | |
def encode_result_to_json(self, result): | |
try: | |
result_json = { | |
"status": "success", | |
"score": format(result[0], ".8f"), | |
"org_size": {"height": result[1][0], "width": result[1][1]} | |
} | |
if result[-1] != None: | |
result_json['prod_no'] = result[-1] | |
return result_json | |
except Exception as e: | |
app_log.error("Encode result error : %s" % e.message) | |
raise e | |
def encode_result_to_json_v2_api(self, result): | |
try: | |
keys = ['blur', 'area_ratio', 'center_position', 'bg', 'boundary', 'clean', 'logo', 'text'] | |
score = dict(zip(keys, result[0])) | |
result_json = { | |
"status": "success", | |
"score": score, | |
"org_size": {"height": result[1][1], "width": result[1][0]} | |
} | |
if result[-1] != None: | |
result_json['prod_no'] = result[-1] | |
return result_json | |
except Exception as e: | |
app_log.error("Encode result error : %s" % e.message) | |
raise e | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment