Last active
August 25, 2017 05:44
-
-
Save calebmadrigal/c34aca7059453f5c2bb123282753ea50 to your computer and use it in GitHub Desktop.
Python timeout function with ast
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cmadrigal-MBP:ast_stuff caleb.madrigal$ python3 wrap_function_test.py | |
Wrapping function: range | |
Wrapping function: do_sleep | |
check_timeout_proxy() | |
check_timeout_proxy() | |
check_timeout_proxy() | |
check_timeout_proxy() | |
Traceback (most recent call last): | |
File "wrap_function_test.py", line 34, in <module> | |
run_code_string_with_timeout(code, 3) | |
File "wrap_function_test.py", line 29, in run_code_string_with_timeout | |
exec(co) | |
File "<ast>", line 5, in <module> | |
File "wrap_function_test.py", line 10, in check_timeout_proxy | |
raise Exception('Timeout expired') | |
Exception: Timeout expired |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def do_sleep(t): | |
time.sleep(t) | |
for i in range(5): | |
do_sleep(1) | |
result_val = 42 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ast | |
import time | |
def run_code_string_with_timeout(code_str, timeout): | |
timeout_start_time = time.time() | |
def check_timeout_proxy(node): | |
print('check_timeout_proxy()') | |
if time.time() - timeout_start_time >= timeout: | |
raise Exception('Timeout expired') | |
else: | |
return node | |
class FunctionWrapper(ast.NodeTransformer): | |
def visit_Call(self, node): | |
if node.func and hasattr(node.func, 'id'): | |
print('Wrapping function:', node.func.id) | |
return ast.Call(func=ast.Name(id='check_timeout_proxy', ctx=ast.Load()), | |
args=[node], keywords=[]) | |
else: | |
return node | |
tree = ast.parse(code_str) | |
tree = FunctionWrapper().visit(tree) | |
# Add lineno & col_offset to the nodes we created | |
ast.fix_missing_locations(tree) | |
co = compile(tree, "<ast>", "exec") | |
exec(co) | |
if __name__ == '__main__': | |
code = open('sleep_test.py', 'r').read() | |
run_code_string_with_timeout(code, 3) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment