Created
August 5, 2013 19:28
-
-
Save calebrob6/6158730 to your computer and use it in GitHub Desktop.
Python threading pattern
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread, Lock | |
from Queue import Queue | |
from datetime import datetime | |
import time | |
import random | |
class Worker(Thread): | |
"""This is the main worker - it will process jobs as long as the "job | |
queue" has jobs available. | |
""" | |
# this lock is used to avoid messing up the screen output - only | |
# one worker will write to screen at a given time. It is | |
# technically a Mutual Exclusion (mutex) | |
screen_mutex = Lock() | |
def __init__(self, queue): | |
# initialize the base class | |
super(Worker, self).__init__() | |
self.queue = queue | |
def log(self, message): | |
"""This convenience function is used to print a message to the | |
screen. You are better off using the logging module, but | |
beware! It is not thread safe (use a server). | |
""" | |
Worker.screen_mutex.acquire() | |
print("{timestamp:%d-%b-%Y %H:%M:%S.%f UTC} " | |
"{name}: {message}".format(timestamp=datetime.utcnow(), | |
name=self.getName(), | |
message=message)) | |
Worker.screen_mutex.release() | |
def run(self): | |
"""This is the method called when you start the thread.""" | |
# The following is an infinite loop which will continue | |
# processing jobs as long as there are jobs available in the | |
# queue | |
while True: | |
# this is how you get a job from the queue - this call | |
# will block until a job is available, or when the parent | |
# thread finishes | |
job = self.queue.get() | |
# in this case the job is simply a random number | |
# indicating how many seconds to sleep (typical example) | |
self.log("sleeping for {0} seconds".format(job)) | |
time.sleep(job) | |
self.log("finished sleeping") | |
# when the job is done, you signal the queue - refer to | |
# the Queue module documentation | |
self.queue.task_done() | |
def main(number_of_jobs=10, number_of_workers=3): | |
# create the queue where you will put the jobs | |
queue = Queue() | |
# create the pool of workers (notice that you pass them the queue | |
# upon construction). | |
for _ in range(number_of_workers): | |
worker = Worker(queue) | |
# you "daemonize" a thread to ensure that the threads will | |
# close when the main program finishes | |
worker.daemon = True | |
worker.start() | |
# now it is time to add the jobs to the queue | |
for _ in range(number_of_jobs): | |
# a random duration between 2 and 5 seconds | |
duration = random.randint(2,5) | |
queue.put(duration) | |
# now wait for all workers to finish - JOIN THE QUEUE | |
queue.join() | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv) == 3: | |
nj = int(sys.argv[1]) | |
nw = int(sys.argv[2]) | |
else: | |
nj = 10 | |
nw = 3 | |
# call main | |
main(nj, nw) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment