Skip to content

Instantly share code, notes, and snippets.

@Jongbhin
Last active January 13, 2020 07:13
Show Gist options
  • Save Jongbhin/d9b8e4bdec71e51d6675121d24f59697 to your computer and use it in GitHub Desktop.
Save Jongbhin/d9b8e4bdec71e51d6675121d24f59697 to your computer and use it in GitHub Desktop.
pil and csv image load form url, file or data
import os
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import datetime
import PIL
from PIL import Image
from exifutil import exifutil
from korean_url_handler import korean_url_handler
import werkzeug
import collections
import urllib2
import cv2
import numpy as np
import contextlib
import requests
from io import BytesIO
from tornado.log import access_log, app_log, gen_log
class daemon_utils():
def __init__(self, upload_path=None):
self.UPLOAD_FOLDER = "../temp_img"
if upload_path != None:
self.UPLOAD_FOLDER = upload_path
print('upload folder:', self.UPLOAD_FOLDER)
self.exifutils = exifutil()
self.korean_url_handler = korean_url_handler()
def gif_to_jpg(self, infile):
try:
save_path = infile.replace('.gif', '.jpg')
im = Image.open(infile)
rgb_im = im.convert('RGB')
rgb_im.save(save_path)
except IOError as e:
print "invalid image file :", infile
raise e
def convert(self, data):
if isinstance(data, basestring):
return data.encode("utf-8")
elif isinstance(data, collections.Mapping):
return dict(map(self.convert, data.iteritems()))
elif isinstance(data, collections.Iterable):
return type(data)(map(self.convert, data))
else:
return data
def get_directory(self, dir_path):
app_log.debug("mkdir : " + dir_path)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
return dir_path
def get_downloaded_directory_path(self, file_path, date_time):
date_list = [date_time.year, date_time.month, date_time.day]
for token in date_list:
file_path = self.get_directory(os.path.join(file_path, str(token)))
return file_path
def download_image(self, image_url, img_ext, file_path='tmp'):
try:
with contextlib.closing(urllib2.urlopen(image_url, timeout=7)) as request:
# request = urllib2.urlopen(image_url, timeout=7)
databuf = request.read()
# img_array = np.asarray(bytearray(databuf), dtype=np.uint8)
filename = self.korean_url_handler.get_downloaded_filename(file_path, img_ext)
with open(filename, 'wb') as img_fp:
img_fp.write(databuf)
img_fp.flush()
return filename;
except Exception as err:
# logging.info('URL Image download error: %s', err)
raise err
def file_upload(self, uploaded_file):
filepath = self.get_downloaded_directory_path(self.UPLOAD_FOLDER, datetime.datetime.now())
fname = str(datetime.datetime.now()).replace(' ', '_') + "." + werkzeug.secure_filename(uploaded_file.filename.split(".")[-1])
filename = filepath + "/" + fname
imagefile = open(filename, 'w')
imagefile.write(uploaded_file['body'])
app_log.debug("file upload : " + filename)
return filename
def raw_file_upload(self, raw_file):
filepath = self.get_downloaded_directory_path(self.UPLOAD_FOLDER, datetime.datetime.now())
fname = str(datetime.datetime.now()).replace(' ', '_') + "." + werkzeug.secure_filename("jpg")
filename = filepath + "/" + fname
imagefile = open(filename, 'w')
imagefile.write(raw_file)
app_log.debug("raw file upload : " + filename)
return filename
def crop_images_with_roi(self, filename, roi):
image = PIL.Image.open(filename).crop(roi.astype(np.int))
# import pdb;pdb.set_trace()
crop_path = filename + str(datetime.datetime.now())[-8:] + ".cropped.jpg"
image.save(crop_path)
return crop_path
def image_download(self, image_url):
try:
if image_url.split('.')[-1] == 'gif':
try:
file_name = self.download_image(image_url, 'gif')
if os.path.exists(file_name) == False:
raise Exception('download_image')
# NOTE: supports for gif format
if file_name.endswith('.gif'):
self.gif_to_jpg(file_name)
file_name = file_name.replace('.gif', '.jpg')
image = cv2.imread(file_name, flags=cv2.IMREAD_COLOR)
finally:
try:
os.remove(file_name)
if file_name.endswith('.gif'):
os.remove(file_name.replace('.gif', '.jpg'))
elif file_name.endswith('.jpg'):
os.remove(file_name.replace('.jpg', '.gif'))
except OSError:
pass
else:
with contextlib.closing(urllib2.urlopen(image_url, timeout=7)) as request:
# request = urllib2.urlopen(image_url, timeout=7)
databuf = request.read()
img_array = np.asarray(bytearray(databuf), dtype=np.uint8)
image = cv2.imdecode(img_array, flags=cv2.IMREAD_COLOR)
return image
except Exception as e:
raise e
def image_load_and_resize(self, image_path, image_data=None, resize=True):
try:
if image_data != None:
imagenp = np.fromstring(image_data, dtype=np.uint8)
image = cv2.imdecode(imagenp, 1)
elif os.path.exists(image_path):
image = cv2.imread(image_path, flags=cv2.IMREAD_COLOR)
if image is None:
image_path = self.gif_to_jpg(image_path)
image = cv2.imread(image_path, flags=cv2.IMREAD_COLOR)
else:
image = self.image_download(image_path)
img_size = image.shape
if resize:
image = cv2.resize(image, (200, 200))
image = image.reshape(1, 200, 200, 3)
except Exception as e:
app_log.info("image load error : %s" % e.message)
if resize:
image = np.zeros(shape=[1, 200, 200, 3], dtype=np.uint8)
else:
image = np.zeros(shape=[1, 224, 224, 3], dtype=np.uint8)
img_size = [0, 0]
return image, img_size
def pil_image_load(self, image_path, image_data=None, resize=True):
try:
if image_data != None:
imagenp = np.fromstring(image_data, dtype=np.uint8)
image = Image.fromarray(imagenp.astype('uint8'), 'RGB')
elif os.path.exists(image_path):
image = Image.open(image_path).convert('RGB')
else:
response = requests.get(image_path)
image = Image.open(BytesIO(response.content)).convert('RGB')
img_size = image.size
if resize:
image = image.resize((224, 224), PIL.Image.ANTIALIAS)
except Exception as e:
print(e)
app_log.info("image load error : %s" % e.message)
image = np.zeros(shape=[3, 224, 224], dtype=np.uint8)
image = Image.fromarray(image.astype('uint8'), 'RGB')
img_size = [0, 0]
return image, img_size
def encode_result_to_json(self, result):
try:
result_json = {
"status": "success",
"score": format(result[0], ".8f"),
"org_size": {"height": result[1][0], "width": result[1][1]}
}
if result[-1] != None:
result_json['prod_no'] = result[-1]
return result_json
except Exception as e:
app_log.error("Encode result error : %s" % e.message)
raise e
def encode_result_to_json_v2_api(self, result):
try:
keys = ['blur', 'area_ratio', 'center_position', 'bg', 'boundary', 'clean', 'logo', 'text']
score = dict(zip(keys, result[0]))
result_json = {
"status": "success",
"score": score,
"org_size": {"height": result[1][1], "width": result[1][0]}
}
if result[-1] != None:
result_json['prod_no'] = result[-1]
return result_json
except Exception as e:
app_log.error("Encode result error : %s" % e.message)
raise e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment