Skip to content

Instantly share code, notes, and snippets.

@kurtbrose
Created August 28, 2012 20:36
Show Gist options
  • Select an option

  • Save kurtbrose/3503955 to your computer and use it in GitHub Desktop.

Select an option

Save kurtbrose/3503955 to your computer and use it in GitHub Desktop.
simple gEvent thread-actor
import gevent.queue
import gevent.threadpool
class ThreadActor(object):
def __init__(self, process_items, process_error=None, queue_size=4096):
self.queue = gevent.queue.Queue(queue_size)
self.pool = gevent.threadpool.ThreadPool(1)
self.process_items = process_items
self.process_error = process_error
self.pool.spawn(self.run)
def send(self, item):
if not self.queue.full():
self.queue.put(item)
else:
pass #TODO: what to do when full?
def run(self):
while True:
try:
nxt = self.queue.get()
self.process_items(nxt)
except Exception as e:
if self.process_error:
self.process_error(nxt, e)
#TODO: what can be done here? even a warning might cause an infinite logging loop
def thread_actor_test():
import sys
a = ThreadActor(lambda d: sys.stdout.write('\n'+str(d)+'\n'))
a.send(1)
a.send(2)
a.send(3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment