Skip to content

Instantly share code, notes, and snippets.

@mrjoes
Created January 22, 2012 16:23
Show Gist options
  • Save mrjoes/1657577 to your computer and use it in GitHub Desktop.
Save mrjoes/1657577 to your computer and use it in GitHub Desktop.
import Queue
import threading
import functools
import traceback
from tornado.ioloop import IOLoop
THREAD_LIMIT = 8
_task_queue = Queue.Queue(THREAD_LIMIT * 2)
_processors = []
class TaskProcessor(threading.Thread):
def __init__(self, io_loop):
self.io_loop = io_loop
threading.Thread.__init__(self)
self.daemon = True
self.start()
def run(self):
try:
while True:
func, callback, kwargs = _task_queue.get()
ex = None
results = None
try:
results = func(**kwargs)
except Exception, ex:
if callback is None:
traceback.print_exc(ex)
if callback is not None:
# Little parameter magic here - everything that is
# not tuple, will be treated as single parameter
if not isinstance(results, tuple):
results = (results,)
# All callback functions should accept exception parameter
# and handle it appropriately. If you plan to handle
# errors differently, remove it.
self.io_loop.add_callback(functools.partial(callback,
*results,
exception=ex))
except Exception, ex:
# TODO: Exception handling, for now just print it out
print ex
def start(io_loop=None, count=THREAD_LIMIT):
io_loop = io_loop or IOLoop.instance()
for _ in xrange(count):
_processors.append(TaskProcessor(io_loop))
def run(func, callback, **kwargs):
_task_queue.put((func, callback, kwargs))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment