Skip to content

Instantly share code, notes, and snippets.

@objcode
Created November 14, 2011 20:25
Show Gist options
  • Save objcode/1365037 to your computer and use it in GitHub Desktop.
Save objcode/1365037 to your computer and use it in GitHub Desktop.
defer to thread in gevent
import gevent, gevent.event
import threading, Queue, collections, time, functools
def _threads_poller_f():
while _OsThread._threads_count:
try:
t, rv, isexc = _OsThread._threads_results.get_nowait()
except Queue.Empty:
gevent.sleep()
else:
if isexc:
t._async_result.set_exception(rv)
else:
t._async_result.set(rv)
class _OsThread(threading.Thread):
_threads_count = 0
_threads_sleep_interval = 0.008
_threads_timeout = 3
_threads_poller = None
_threads_results = Queue.Queue()
_free_threads = collections.deque()
_free_threads_lock = threading.RLock()
def __init__(self):
"Do not instantiate _OsThread objects directly. Use acquire instead."
threading.Thread.__init__(self)
self.setDaemon(True)
self.stopped = False
self.inq = Queue.Queue()
def _init(self):
self.free = False
_OsThread._threads_count += 1
if _OsThread._threads_count == 1:
_OsThread._threads_poller = gevent.spawn_raw(_threads_poller_f)
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.release()
@classmethod
def acquire(cls):
with _OsThread._free_threads_lock:
try:
self = _OsThread._free_threads.popleft()
self._init()
except IndexError:
self = cls()
self._init()
self.start()
return self
def release(self):
with _OsThread._free_threads_lock:
if not self.free and not self.stopped:
_OsThread._threads_count -= 1
self.free = True
_OsThread._free_threads.append(self)
return self
def stop(self):
with _OsThread._free_threads_lock:
self.stopped = True
if self.free:
self.free = False
_OsThread._free_threads.remove(self)
else:
_OsThread._threads_count -= 1
self.inq.put(None)
return self
# Overridden
def run(self):
while True:
time.sleep(_OsThread._threads_sleep_interval)
try:
obj = self.inq.get(timeout=_OsThread._threads_timeout)
except Queue.Empty:
with _OsThread._free_threads_lock:
if self.free:
self.free = False
_OsThread._free_threads.remove(self)
break
else:
continue
if obj is None:
break
else:
rv = None
isexc = False
try:
rv = obj.func(*obj.args, **obj.kwargs)
except Exception as e:
rv = e
isexc = True
finally:
_OsThread._threads_results.put((obj, rv, isexc))
osthread_acquire = _OsThread.acquire
class inthread(object):
def __new__(cls, f, *args, **kwargs):
self = object.__new__(cls)
if isinstance(f, _OsThread):
self.func = args[0]
self.args = args[1:]
self.kwargs = kwargs
self._async_result = gevent.event.AsyncResult()
self.thread = f
self.thread.inq.put(self)
return self._async_result.get()
else:
self.func = f
self.args = args
self.kwargs = kwargs
self._async_result = gevent.event.AsyncResult()
self.thread = _OsThread.acquire()
self.thread.inq.put(self)
try:
return self._async_result.get()
finally:
self.thread.release()
@classmethod
def wrap(cls, func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
return cls(func, *args, **kwargs)
return wrapped
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment