Created
May 9, 2020 16:22
-
-
Save aydinemre/171a86ad3ca567753cb646a251fb6066 to your computer and use it in GitHub Desktop.
Multiprocess image annotation example with using clarifai object detection api.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
import json | |
import os | |
from datetime import datetime | |
from multiprocessing import Pool, cpu_count | |
from pathlib import Path | |
import pandas as pd | |
from PIL import Image | |
from clarifai.rest import ClarifaiApp | |
from clarifai.rest import Model | |
from tqdm import tqdm | |
RAW_IMAGE_DIR = 'Data/images/raw' | |
CROPPED_IMAGE_DIR = 'Data/images/cropped' | |
IMAGE_RESPONSE_DIR = 'Data/clarifai_responses' | |
app = ClarifaiApp(api_key='#API_KEY#') | |
model = Model(app.api, model_id='72c523807f93e18b431676fb9a58e6ad') | |
def api_call(image_path): | |
# Get file name. | |
filename = os.path.splitext(image_path)[0] | |
# JSON Response path. | |
response_file = os.path.join(IMAGE_RESPONSE_DIR, filename + '.json') | |
# Check response saved earlier. | |
if os.path.exists(response_file): | |
# If exist load from file system. | |
# print("Response loading from file.") | |
with open(response_file, 'r') as f: | |
response = json.load(f) | |
else: | |
# Otherwise api call. | |
print(datetime.now()) | |
response = model.predict_by_filename(filename=os.path.join(RAW_IMAGE_DIR, image_path)) | |
with open(os.path.join(response_file), 'w+') as f: | |
json.dump(response, f) | |
return response | |
def converter(bounding_box, width, height): | |
result = { | |
'x': bounding_box['left_col'] * width, | |
'y': bounding_box['top_row'] * height, | |
'w': (bounding_box['right_col'] * width) - (bounding_box['left_col'] * width), | |
'h': (bounding_box['bottom_row'] * height) - (bounding_box['top_row'] * height) | |
} | |
return result['x'], result['y'], result['x'] + result['w'], result['y'] + result['h'] | |
def process_response(image_path, response): | |
file_name = os.path.splitext(image_path)[0] | |
im = Image.open(os.path.join(RAW_IMAGE_DIR, image_path)) | |
if response["outputs"][0]['data'].get('regions', False): | |
for index, box in enumerate(response["outputs"][0]['data']['regions']): | |
# Convert bounding box to known format. | |
bounding_box = box['region_info']['bounding_box'] | |
bounding_box = converter(bounding_box, width=im.width, height=im.height) | |
# Create unique file name. | |
data = box['data']['concepts'][0] | |
crop_file_name = "crop_" + file_name + "_" + data['name'].replace('/', '') + "_" + str('%.5f' % data['value']) + "_" + str( | |
index) + '.jpg' | |
if not os.path.exists(os.path.join(CROPPED_IMAGE_DIR,crop_file_name)): | |
im.crop(bounding_box).save(os.path.join(CROPPED_IMAGE_DIR, crop_file_name)) | |
return True | |
def image_annotation(image_path): | |
response = api_call(image_path) | |
return process_response(image_path, response) | |
samples = os.listdir(RAW_IMAGE_DIR) | |
print(len(samples)) | |
with Pool(2) as p: | |
print(p.map(image_annotation, samples)) | |
print("End") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment