Created
November 14, 2011 20:25
-
-
Save objcode/1365037 to your computer and use it in GitHub Desktop.
defer to thread in gevent
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gevent, gevent.event | |
import threading, Queue, collections, time, functools | |
def _threads_poller_f(): | |
while _OsThread._threads_count: | |
try: | |
t, rv, isexc = _OsThread._threads_results.get_nowait() | |
except Queue.Empty: | |
gevent.sleep() | |
else: | |
if isexc: | |
t._async_result.set_exception(rv) | |
else: | |
t._async_result.set(rv) | |
class _OsThread(threading.Thread): | |
_threads_count = 0 | |
_threads_sleep_interval = 0.008 | |
_threads_timeout = 3 | |
_threads_poller = None | |
_threads_results = Queue.Queue() | |
_free_threads = collections.deque() | |
_free_threads_lock = threading.RLock() | |
def __init__(self): | |
"Do not instantiate _OsThread objects directly. Use acquire instead." | |
threading.Thread.__init__(self) | |
self.setDaemon(True) | |
self.stopped = False | |
self.inq = Queue.Queue() | |
def _init(self): | |
self.free = False | |
_OsThread._threads_count += 1 | |
if _OsThread._threads_count == 1: | |
_OsThread._threads_poller = gevent.spawn_raw(_threads_poller_f) | |
def __enter__(self): | |
return self | |
def __exit__(self, type, value, tb): | |
self.release() | |
@classmethod | |
def acquire(cls): | |
with _OsThread._free_threads_lock: | |
try: | |
self = _OsThread._free_threads.popleft() | |
self._init() | |
except IndexError: | |
self = cls() | |
self._init() | |
self.start() | |
return self | |
def release(self): | |
with _OsThread._free_threads_lock: | |
if not self.free and not self.stopped: | |
_OsThread._threads_count -= 1 | |
self.free = True | |
_OsThread._free_threads.append(self) | |
return self | |
def stop(self): | |
with _OsThread._free_threads_lock: | |
self.stopped = True | |
if self.free: | |
self.free = False | |
_OsThread._free_threads.remove(self) | |
else: | |
_OsThread._threads_count -= 1 | |
self.inq.put(None) | |
return self | |
# Overridden | |
def run(self): | |
while True: | |
time.sleep(_OsThread._threads_sleep_interval) | |
try: | |
obj = self.inq.get(timeout=_OsThread._threads_timeout) | |
except Queue.Empty: | |
with _OsThread._free_threads_lock: | |
if self.free: | |
self.free = False | |
_OsThread._free_threads.remove(self) | |
break | |
else: | |
continue | |
if obj is None: | |
break | |
else: | |
rv = None | |
isexc = False | |
try: | |
rv = obj.func(*obj.args, **obj.kwargs) | |
except Exception as e: | |
rv = e | |
isexc = True | |
finally: | |
_OsThread._threads_results.put((obj, rv, isexc)) | |
osthread_acquire = _OsThread.acquire | |
class inthread(object): | |
def __new__(cls, f, *args, **kwargs): | |
self = object.__new__(cls) | |
if isinstance(f, _OsThread): | |
self.func = args[0] | |
self.args = args[1:] | |
self.kwargs = kwargs | |
self._async_result = gevent.event.AsyncResult() | |
self.thread = f | |
self.thread.inq.put(self) | |
return self._async_result.get() | |
else: | |
self.func = f | |
self.args = args | |
self.kwargs = kwargs | |
self._async_result = gevent.event.AsyncResult() | |
self.thread = _OsThread.acquire() | |
self.thread.inq.put(self) | |
try: | |
return self._async_result.get() | |
finally: | |
self.thread.release() | |
@classmethod | |
def wrap(cls, func): | |
@functools.wraps(func) | |
def wrapped(*args, **kwargs): | |
return cls(func, *args, **kwargs) | |
return wrapped |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment