Skip to content

Instantly share code, notes, and snippets.

@tawateer
Last active October 10, 2015 11:02
Show Gist options
  • Save tawateer/e6dd54492f39758c83ad to your computer and use it in GitHub Desktop.
Save tawateer/e6dd54492f39758c83ad to your computer and use it in GitHub Desktop.
杀掉超时的线程
#-*- coding: utf-8 -*-
""" 封装成装饰器.
"""
import sys
import time
import threading
class KThread(threading.Thread):
""" A subclass of threading.Thread, with a kill() method.
"""
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
self.killed = False
def start(self):
""" Start the thread. """
self.__run_backup = self.run
self.run = self.__run # Force the Thread to install our trace.
threading.Thread.start(self)
def __run(self):
""" Hacked run function, which installs the trace. """
sys.settrace(self.globaltrace)
self.__run_backup()
self.run = self.__run_backup
def globaltrace(self, frame, why, arg):
if why == 'call':
return self.localtrace
else:
return None
def localtrace(self, frame, why, arg):
if self.killed:
if why == 'line':
raise SystemExit()
return self.localtrace
def kill(self):
self.killed = True
class Timeout(Exception):
""" function run timeout """
def thread_timeout(seconds):
""" 超时装饰器,指定超时时间.
若被装饰的方法在指定的时间内未返回,则抛出 Timeout 异常
"""
def timeout_decorator(func):
""" 真正的装饰器 """
def _new_func(oldfunc, result, oldfunc_args, oldfunc_kwargs):
result.append(oldfunc(*oldfunc_args, **oldfunc_kwargs))
def _(*args, **kwargs):
result = []
new_kwargs = { # create new args for _new_func, because we want to get the func return val to result list
'oldfunc': func,
'result': result,
'oldfunc_args': args,
'oldfunc_kwargs': kwargs
}
thd = KThread(target=_new_func, args=(), kwargs=new_kwargs)
thd.start()
thd.join(seconds)
alive = thd.isAlive()
thd.kill() # kill the child thread
if alive:
raise Timeout(u'function run too long, timeout %d seconds.' % seconds)
else:
return result[0]
_.__name__ = func.__name__
_.__doc__ = func.__doc__
return _
return timeout_decorator
@thread_timeout(5)
def method_timeout(seconds, text):
print 'start', seconds, text
time.sleep(seconds)
print 'finish', seconds, text
return seconds
if __name__ == '__main__':
for sec in range(1, 10):
try:
print '*' * 20
print method_timeout(sec, 'test waiting %d seconds' % sec)
except Timeout, e:
print e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment