Created
November 28, 2016 20:14
-
-
Save Nelsonochoam/1cf95bf8164a24bb2c159568516ba389 to your computer and use it in GitHub Desktop.
Python threaded pipeline execution example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
from Queue import Queue, Empty | |
import numpy as np | |
import cv2 | |
from PIL import Image | |
import boto | |
from boto.s3.connection import S3Connection | |
import boto3 | |
def tag_night_mode(): | |
""" | |
Check the images and tag them as night mode if they are night mode | |
""" | |
dynamodb = boto3.resource('dynamodb', region_name='us-west-2') | |
table = dynamodb.Table('CVGroundTruth') | |
images = get_processed_images(table) | |
AWS_CONNECTIONS = S3Connection() | |
BUCKET = AWS_CONNECTIONS.get_bucket('cvimagesets') | |
DOWNLOAD_DIR = '/tmp' | |
# Flags to control the flow of the thread execution | |
start_classification = threading.Event() | |
start_upload = threading.Event() | |
################# | |
# Worker methods | |
################# | |
def download(downloadQ, classifyQ): | |
""" | |
Method executed by a thread to download an image so that it can be | |
checked for night mode and re uploaded | |
""" | |
# Loop so that the thread does not stop | |
while True: | |
try: | |
key = BUCKET.get_key(downloadQ.get_nowait()) | |
dir_name = '/'.join(key.name.split('/')[:-1]) | |
if not os.path.exists(os.path.join(DOWNLOAD_DIR, dir_name)): | |
os.makedirs(os.path.join(DOWNLOAD_DIR, dir_name)) | |
# Download image | |
path = os.path.join(DOWNLOAD_DIR, key.name) | |
key.get_contents_to_filename(path) | |
# Put the path of the downloaded image into the classify | |
# queue so that it could get classified as night or day mode | |
classifyQ.put((key.name, path)) | |
# Indicate that there is a first task on the classificiation | |
# queue so that threads can be started | |
if not start_classification.is_set(): | |
start_classification.set() | |
downloadQ.task_done() | |
except Empty: | |
# Stop the download threads once the download queue is empty | |
break | |
except Exception as e: | |
# Any other error downloading the image, skip it | |
print "Failed to download image Ex: {}".format(e) | |
downloadQ.task_done() | |
def classify(classifyQ, uploadQ): | |
""" | |
Method used by a thread to classify an image that has been | |
downloaded and update the document on dynamodb | |
""" | |
while True: | |
try: | |
# Wait for a new item on the queue until a new one is available | |
# for classification. Note this will block the thread for | |
# 10 seconds waiting for a classificaiton task to become | |
# available | |
key, path = classifyQ.get(timeout=10) | |
# Open the stored image and check if ti was captured on | |
# day or night | |
img = Image.open(path) | |
threshold = 2 | |
img_arr = np.array(img) | |
b, g, r = cv2.split(img_arr) | |
diff = cv2.absdiff(b, g) | |
metric = sum(diff.flatten())/( | |
img_arr.shape[0] * img_arr.shape[1]) | |
# Put a new task on the upload queue so that the upload | |
# threads cosume it and update the dynamo db entry | |
night_mode = metric < threshold | |
uploadQ.put((key, night_mode.item())) | |
# Indicate that there are tasks on the upload queue so that | |
# the upload threads are started | |
if not start_upload.is_set(): | |
start_upload.set() | |
# Delete the file from the folder | |
os.remove(path) | |
classifyQ.task_done() | |
except Empty: | |
break | |
except Exception as e: | |
print "Failed to classify image Ex: {}".format(e) | |
classifyQ.task_done() | |
def upload(uploadQ): | |
""" | |
Upload the document of the image into dynamodb once the image has | |
been classified | |
""" | |
while True: | |
try: | |
# Wait for a new item to become available for upload. Note | |
# this will block the thread for 10 seconds waiting for a | |
# upload task to become available | |
key, night_mode = uploadQ.get(timeout=10) | |
# Update the document entry on dynamodb | |
table.update_item( | |
Key={'image_key': key}, | |
UpdateExpression="SET night_mode = :n", | |
ExpressionAttributeValues={":n": night_mode}, | |
ReturnValues="UPDATED_NEW" | |
) | |
uploadQ.task_done() | |
except Empty: | |
break | |
except Exception as e: | |
print "Failed to update document Ex: {}".format(e) | |
uploadQ.task_done() | |
def start_workers(size, target, *args): | |
""" | |
Method to start the workers on a target | |
""" | |
for i in range(size): | |
worker = threading.Thread(target=target, args=args) | |
# Set as deamon so that they die when main thread is done | |
worker.daemon = True | |
worker.start() | |
# Number of threads to be used in each stage of the pipeline | |
STAGE_THREAD_POOL_SIZE = 4 | |
# Build the queues that the thread should process | |
downloadQ = Queue() | |
classifyQ = Queue() | |
uploadQ = Queue() | |
# Build the download queue | |
[downloadQ.put(doc['image_key']) for doc in images] | |
# Start the threads for donwloads | |
start_workers(STAGE_THREAD_POOL_SIZE, download, downloadQ, classifyQ) | |
# Wait for the classifyQ queue to have a task on it before starting the | |
# classification threads | |
start_classification.wait() | |
start_workers(STAGE_THREAD_POOL_SIZE, classify, classifyQ, uploadQ) | |
# Wait for the uploadQ to have a task before starting the threads | |
start_upload.wait() | |
start_workers(STAGE_THREAD_POOL_SIZE, upload, uploadQ) | |
# Wait for all the queues to be processed | |
downloadQ.join() | |
classifyQ.join() | |
uploadQ.join() | |
# Delete the tmp folder | |
shutil.rmtree(os.path.join(DOWNLOAD_DIR, 'test_set_mturk')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment