-
-
Save pipoket/1393002 to your computer and use it in GitHub Desktop.
gevent getthreading
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gevent, gevent.event | |
import threading, Queue, collections, time, functools | |
def _threads_poller_f(): | |
while _OsThread._threads_count: | |
try: | |
t, rv, isexc = _OsThread._threads_results.get_nowait() | |
except Queue.Empty: | |
gevent.sleep() | |
else: | |
if isexc: | |
t._async_result.set_exception(rv) | |
else: | |
t._async_result.set(rv) | |
class _OsThread(threading.Thread): | |
_threads_count = 0 | |
_threads_sleep_interval = 0.008 | |
_threads_timeout = 3 | |
_threads_poller = None | |
_threads_results = Queue.Queue() | |
_free_threads = collections.deque() | |
_free_threads_lock = threading.RLock() | |
def __init__(self): | |
"Do not instantiate _OsThread objects directly. Use acquire instead." | |
threading.Thread.__init__(self) | |
self.setDaemon(True) | |
self.stopped = False | |
self.inq = Queue.Queue() | |
def _init(self): | |
self.free = False | |
_OsThread._threads_count += 1 | |
if _OsThread._threads_count == 1: | |
_OsThread._threads_poller = gevent.spawn_raw(_threads_poller_f) | |
def __enter__(self): | |
return self | |
def __exit__(self, type, value, tb): | |
self.release() | |
@classmethod | |
def acquire(cls): | |
with _OsThread._free_threads_lock: | |
try: | |
self = _OsThread._free_threads.popleft() | |
self._init() | |
except IndexError: | |
self = cls() | |
self._init() | |
self.start() | |
return self | |
def release(self): | |
with _OsThread._free_threads_lock: | |
if not self.free and not self.stopped: | |
_OsThread._threads_count -= 1 | |
self.free = True | |
_OsThread._free_threads.append(self) | |
return self | |
def stop(self): | |
with _OsThread._free_threads_lock: | |
self.stopped = True | |
if self.free: | |
self.free = False | |
_OsThread._free_threads.remove(self) | |
else: | |
_OsThread._threads_count -= 1 | |
self.inq.put(None) | |
return self | |
# Overridden | |
def run(self): | |
while True: | |
time.sleep(_OsThread._threads_sleep_interval) | |
try: | |
obj = self.inq.get(timeout=_OsThread._threads_timeout) | |
except Queue.Empty: | |
with _OsThread._free_threads_lock: | |
if self.free: | |
self.free = False | |
_OsThread._free_threads.remove(self) | |
break | |
else: | |
continue | |
if obj is None: | |
break | |
else: | |
rv = None | |
isexc = False | |
try: | |
rv = obj.func(*obj.args, **obj.kwargs) | |
except Exception as e: | |
rv = e | |
isexc = True | |
finally: | |
_OsThread._threads_results.put((obj, rv, isexc)) | |
osthread_acquire = _OsThread.acquire | |
class inthread(object): | |
def __new__(cls, f, *args, **kwargs): | |
self = object.__new__(cls) | |
if isinstance(f, _OsThread): | |
self.func = args[0] | |
self.args = args[1:] | |
self.kwargs = kwargs | |
self._async_result = gevent.event.AsyncResult() | |
self.thread = f | |
self.thread.inq.put(self) | |
return self._async_result.get() | |
else: | |
self.func = f | |
self.args = args | |
self.kwargs = kwargs | |
self._async_result = gevent.event.AsyncResult() | |
self.thread = _OsThread.acquire() | |
self.thread.inq.put(self) | |
try: | |
return self._async_result.get() | |
finally: | |
self.thread.release() | |
@classmethod | |
def wrap(cls, func): | |
@functools.wraps(func) | |
def wrapped(*args, **kwargs): | |
return cls(func, *args, **kwargs) | |
return wrapped |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gethreading | |
import gevent | |
import time, random, threading | |
def get_running_threads(): | |
return set(th for th in threading.enumerate() if th.name not in ('MainThread', 'SockThread')) | |
def wait(secs): | |
time.sleep(secs) | |
return gethreading.threading.currentThread() | |
def f(x): | |
print "f({}) start".format(x) | |
t1 = gethreading.inthread(wait, random.random()) | |
print "f({}) middle".format(x) | |
t2 = gethreading.inthread(wait, random.random()) | |
## with osthread_acquire() as t: | |
## t1 = inthread(t, wait, random.random()) | |
## t2 = inthread(t, wait, random.random()) | |
print "f({}) end [t1={}, t2={}]".format(x, t1.name, t2.name) | |
jobs = [gevent.spawn(f, x) for x in 'abcde'] | |
gevent.joinall(jobs) | |
##print "---" | |
## | |
##_rt = set() | |
##while True: | |
## rt = get_running_threads() | |
## if not rt: | |
## break | |
## if rt != _rt: | |
## print "Running threads: {}".format(', '.join(t.name for t in rt)) | |
## _rt = rt | |
## time.sleep(0.008) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment