Skip to content

Instantly share code, notes, and snippets.

@calebrob6
Created August 5, 2013 19:28
Show Gist options
  • Save calebrob6/6158730 to your computer and use it in GitHub Desktop.
Save calebrob6/6158730 to your computer and use it in GitHub Desktop.
Python threading pattern
from threading import Thread, Lock
from Queue import Queue
from datetime import datetime
import time
import random
class Worker(Thread):
"""This is the main worker - it will process jobs as long as the "job
queue" has jobs available.
"""
# this lock is used to avoid messing up the screen output - only
# one worker will write to screen at a given time. It is
# technically a Mutual Exclusion (mutex)
screen_mutex = Lock()
def __init__(self, queue):
# initialize the base class
super(Worker, self).__init__()
self.queue = queue
def log(self, message):
"""This convenience function is used to print a message to the
screen. You are better off using the logging module, but
beware! It is not thread safe (use a server).
"""
Worker.screen_mutex.acquire()
print("{timestamp:%d-%b-%Y %H:%M:%S.%f UTC} "
"{name}: {message}".format(timestamp=datetime.utcnow(),
name=self.getName(),
message=message))
Worker.screen_mutex.release()
def run(self):
"""This is the method called when you start the thread."""
# The following is an infinite loop which will continue
# processing jobs as long as there are jobs available in the
# queue
while True:
# this is how you get a job from the queue - this call
# will block until a job is available, or when the parent
# thread finishes
job = self.queue.get()
# in this case the job is simply a random number
# indicating how many seconds to sleep (typical example)
self.log("sleeping for {0} seconds".format(job))
time.sleep(job)
self.log("finished sleeping")
# when the job is done, you signal the queue - refer to
# the Queue module documentation
self.queue.task_done()
def main(number_of_jobs=10, number_of_workers=3):
# create the queue where you will put the jobs
queue = Queue()
# create the pool of workers (notice that you pass them the queue
# upon construction).
for _ in range(number_of_workers):
worker = Worker(queue)
# you "daemonize" a thread to ensure that the threads will
# close when the main program finishes
worker.daemon = True
worker.start()
# now it is time to add the jobs to the queue
for _ in range(number_of_jobs):
# a random duration between 2 and 5 seconds
duration = random.randint(2,5)
queue.put(duration)
# now wait for all workers to finish - JOIN THE QUEUE
queue.join()
if __name__ == "__main__":
import sys
if len(sys.argv) == 3:
nj = int(sys.argv[1])
nw = int(sys.argv[2])
else:
nj = 10
nw = 3
# call main
main(nj, nw)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment