Skip to content

Instantly share code, notes, and snippets.

@Nelsonochoam
Created November 28, 2016 20:14
Show Gist options
  • Save Nelsonochoam/1cf95bf8164a24bb2c159568516ba389 to your computer and use it in GitHub Desktop.
Save Nelsonochoam/1cf95bf8164a24bb2c159568516ba389 to your computer and use it in GitHub Desktop.
Python threaded pipeline execution example
import threading
from Queue import Queue, Empty
import numpy as np
import cv2
from PIL import Image
import boto
from boto.s3.connection import S3Connection
import boto3
def tag_night_mode():
"""
Check the images and tag them as night mode if they are night mode
"""
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('CVGroundTruth')
images = get_processed_images(table)
AWS_CONNECTIONS = S3Connection()
BUCKET = AWS_CONNECTIONS.get_bucket('cvimagesets')
DOWNLOAD_DIR = '/tmp'
# Flags to control the flow of the thread execution
start_classification = threading.Event()
start_upload = threading.Event()
#################
# Worker methods
#################
def download(downloadQ, classifyQ):
"""
Method executed by a thread to download an image so that it can be
checked for night mode and re uploaded
"""
# Loop so that the thread does not stop
while True:
try:
key = BUCKET.get_key(downloadQ.get_nowait())
dir_name = '/'.join(key.name.split('/')[:-1])
if not os.path.exists(os.path.join(DOWNLOAD_DIR, dir_name)):
os.makedirs(os.path.join(DOWNLOAD_DIR, dir_name))
# Download image
path = os.path.join(DOWNLOAD_DIR, key.name)
key.get_contents_to_filename(path)
# Put the path of the downloaded image into the classify
# queue so that it could get classified as night or day mode
classifyQ.put((key.name, path))
# Indicate that there is a first task on the classificiation
# queue so that threads can be started
if not start_classification.is_set():
start_classification.set()
downloadQ.task_done()
except Empty:
# Stop the download threads once the download queue is empty
break
except Exception as e:
# Any other error downloading the image, skip it
print "Failed to download image Ex: {}".format(e)
downloadQ.task_done()
def classify(classifyQ, uploadQ):
"""
Method used by a thread to classify an image that has been
downloaded and update the document on dynamodb
"""
while True:
try:
# Wait for a new item on the queue until a new one is available
# for classification. Note this will block the thread for
# 10 seconds waiting for a classificaiton task to become
# available
key, path = classifyQ.get(timeout=10)
# Open the stored image and check if ti was captured on
# day or night
img = Image.open(path)
threshold = 2
img_arr = np.array(img)
b, g, r = cv2.split(img_arr)
diff = cv2.absdiff(b, g)
metric = sum(diff.flatten())/(
img_arr.shape[0] * img_arr.shape[1])
# Put a new task on the upload queue so that the upload
# threads cosume it and update the dynamo db entry
night_mode = metric < threshold
uploadQ.put((key, night_mode.item()))
# Indicate that there are tasks on the upload queue so that
# the upload threads are started
if not start_upload.is_set():
start_upload.set()
# Delete the file from the folder
os.remove(path)
classifyQ.task_done()
except Empty:
break
except Exception as e:
print "Failed to classify image Ex: {}".format(e)
classifyQ.task_done()
def upload(uploadQ):
"""
Upload the document of the image into dynamodb once the image has
been classified
"""
while True:
try:
# Wait for a new item to become available for upload. Note
# this will block the thread for 10 seconds waiting for a
# upload task to become available
key, night_mode = uploadQ.get(timeout=10)
# Update the document entry on dynamodb
table.update_item(
Key={'image_key': key},
UpdateExpression="SET night_mode = :n",
ExpressionAttributeValues={":n": night_mode},
ReturnValues="UPDATED_NEW"
)
uploadQ.task_done()
except Empty:
break
except Exception as e:
print "Failed to update document Ex: {}".format(e)
uploadQ.task_done()
def start_workers(size, target, *args):
"""
Method to start the workers on a target
"""
for i in range(size):
worker = threading.Thread(target=target, args=args)
# Set as deamon so that they die when main thread is done
worker.daemon = True
worker.start()
# Number of threads to be used in each stage of the pipeline
STAGE_THREAD_POOL_SIZE = 4
# Build the queues that the thread should process
downloadQ = Queue()
classifyQ = Queue()
uploadQ = Queue()
# Build the download queue
[downloadQ.put(doc['image_key']) for doc in images]
# Start the threads for donwloads
start_workers(STAGE_THREAD_POOL_SIZE, download, downloadQ, classifyQ)
# Wait for the classifyQ queue to have a task on it before starting the
# classification threads
start_classification.wait()
start_workers(STAGE_THREAD_POOL_SIZE, classify, classifyQ, uploadQ)
# Wait for the uploadQ to have a task before starting the threads
start_upload.wait()
start_workers(STAGE_THREAD_POOL_SIZE, upload, uploadQ)
# Wait for all the queues to be processed
downloadQ.join()
classifyQ.join()
uploadQ.join()
# Delete the tmp folder
shutil.rmtree(os.path.join(DOWNLOAD_DIR, 'test_set_mturk'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment