Created
March 8, 2017 14:36
-
-
Save gyglim/54ebb90f7fc93417cf15ebd2c9abc97a to your computer and use it in GitHub Desktop.
Decorator to timeout function calls and correctly re-raise Exceptions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Function to build timeout dectorators for specific times. | |
Example usage: | |
@timeout(1) | |
def x(): time.sleep(2) | |
x() | |
will raise a TimeoutException. | |
""" | |
__author__ = "Michael Gygli, Gifs.com" | |
import Queue | |
import sys | |
import threading | |
import time | |
from multiprocessing.pool import TimeoutError | |
def timeout(max_time): | |
"""Builds a timeout decorator for the specified max_time. | |
The timeout decorator uses a thread to run the decorated function. | |
Upon timeout, it raises a multiprocessing.pool.TimeoutError | |
Exceptions in the function call are reraised and thus passed to the caller. | |
Parameters | |
---------- | |
max_time : float | |
time in seconds after which the execution will be stoped | |
Returns | |
------- | |
timeout | |
a timeout decorator function for the specified time | |
Examples: | |
>>> def x(): return ('Hello', 'World') | |
>>> timeout(1)(x)() | |
('Hello', 'World') | |
>>> def x(): raise IndexError("Test") | |
>>> timeout(1)(x)() | |
Traceback (most recent call last): | |
... | |
IndexError: Test | |
>>> def x(): time.sleep(2) | |
>>> timeout(1)(x)() | |
Traceback (most recent call last): | |
... | |
TimeoutError: Processing took longer than 1 seconds | |
""" | |
def timeout(fn): | |
"""The timeout decorator.""" | |
def wrapper(*args, **kwargs): | |
queue = Queue.Queue() | |
def enqueue_fn(): | |
try: # Try adding the results of the function call | |
queue.put(fn(*args, **kwargs)) | |
except BaseException: | |
# If it fails, enqueue the exception instead | |
queue.put(sys.exc_info()) | |
thread = threading.Thread(target=enqueue_fn) | |
thread.start() | |
thread.join(max_time) | |
if thread.isAlive(): | |
raise TimeoutError("Processing took longer than %s seconds" % max_time) | |
output = queue.get_nowait() | |
# Reraise if an exception occured | |
if isinstance(output, tuple) and type(output[0]) is type and isinstance(output[0](), | |
BaseException): | |
raise output[0], output[1], output[2] | |
else: # return the results otherwise | |
return output | |
return wrapper | |
# Return the timeout decorator | |
return timeout | |
if __name__ == "__main__": | |
import doctest | |
doctest.testmod() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment