Created
October 7, 2012 18:48
-
-
Save seriyps/3849217 to your computer and use it in GitHub Desktop.
AST transformation for tornado.gen.Task
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
''' | |
Created on 2012-10-07 | |
@author: Sergey <[email protected]> | |
Альтернативный вариант решения из статьи http://habrahabr.ru/post/153595/ | |
на базе модификации AST | |
@asynchronous | |
@gen.engine | |
@shortgen | |
def get(): | |
result << my_task(args, *kwargs) | |
преобразуется в | |
@asynchronous | |
@gen.engine | |
def get(): | |
result = yield gen.Task(my_task, args, *kwargs) | |
''' | |
import ast | |
import marshal | |
import py_compile | |
import time | |
import os.path | |
class RewriteGenTask(ast.NodeTransformer): | |
def __init__(self, *args, **kwargs): | |
self.on_decorator = [] | |
self.on_assign = [] | |
super(RewriteGenTask, self).__init__(*args, **kwargs) | |
def shortgen_deco_pos(self, decorator_list): | |
# проверяет, что в списке декораторов имеется декоратор с именем | |
# shortgen и возвращает его позицию. | |
for pos, deco in enumerate(decorator_list): | |
# Name(id='shortgen', ctx=Load()) | |
if isinstance(deco, ast.Name) and deco.id == 'shortgen': | |
return pos | |
return -1 | |
def visit_FunctionDef(self, node): | |
""" | |
Проверяет, что функция обернута в декоратор shortgen. | |
Если обернута, удаляем декоратор и трансформируем содержимое. | |
FunctionDef( | |
name='get_short', | |
args=arguments(...), | |
body=[...], | |
decorator_list=[ | |
Attribute(value=Name(id='web', ...), attr='asynchronous', ...), | |
Attribute(value=Name(id='gen', ...), attr='engine', ...), | |
Name(id='shortgen', ctx=Load())]) | |
""" | |
deco_pos = self.shortgen_deco_pos(node.decorator_list) | |
if deco_pos >= 0: | |
# если функция обернута в shortgen декоратор, удаляем его, | |
# делаем пометку в стеке и запускаем Visitor по содержимому | |
# функции | |
self.on_decorator.append(True) | |
node.decorator_list.pop(deco_pos) | |
self.generic_visit(node) # трансформируем содержимое функции | |
self.on_decorator.pop() | |
return node | |
def visit_Expr(self, expr): | |
""" | |
== Основная трансформация == | |
Трансформируем | |
result2 << func(arg, k=v, *args, **kwargs) | |
в | |
result2 = gen.Task(func, arg, k=v, *args, **kwargs) | |
Пример AST представления "stmt << func(...)" (исходные данные): | |
Expr(value=BinOp(left=Name(id='result', ctx=Load()), | |
op=LShift(), | |
right=Call( | |
func=Name(id='fetch', ctx=Load()), | |
args=[Num(n=1)], | |
keywords=[keyword(arg='k', value=Num(n=2))], | |
starargs=Tuple(elts=[Num(n=3)], ctx=Load()), | |
kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)]))))) | |
---- vvvvvvvvvvv ---- | |
Пример AST представления "stmt = yield func(...)" (результат): | |
Assign(targets=[Name(id='result', ctx=Store())], | |
value=Yield(value=Call( | |
func=Attribute(value=Name(id='gen', ctx=Load()), | |
attr='Task', ctx=Load()), | |
args=[Name(id='fetch', ctx=Load()), Num(n=1)], | |
keywords=[keyword(arg='k', value=Num(n=2))], | |
starargs=Tuple(elts=[Num(n=3)], ctx=Load()), | |
kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)])))) | |
""" | |
node = expr.value # BinOp | |
if not (self.on_decorator | |
and isinstance(expr.value, ast.BinOp) | |
and isinstance(node.op, ast.LShift)): | |
# если функция не обернута в декоратор (on_decorator пуст), ничего | |
# не меняем | |
return expr | |
# если функция, содержащая LShift, обернута в декоратор, | |
# то заменяем на вызов gen.Task() | |
# для начала конвертируем изменение на месте (stmt <<) на | |
# присваивание (stmt =). Для этого заменяем ctx=Load на | |
# ctx=Store (см self.visit_Load()) | |
self.on_assign.append(True) | |
assign_target = self.visit(node.left) | |
self.on_assign.pop() | |
# генерируем присваивание ... = ... | |
(new_node, ) = ast.Assign( | |
targets = [assign_target], | |
value = ast.Yield( | |
value=self.construct_gen_task_call(node.right))), | |
# копируем номер линии оригинальной конструкции | |
new_node = ast.fix_missing_locations(ast.copy_location(new_node, expr)) | |
return new_node | |
def construct_gen_task_call(self, func_call): | |
""" | |
Конвертируем вызов функции в вызов gen.Task с именем функции первым | |
параметром | |
func(arg, k=v, *args, **kwargs) | |
в | |
gen.Task(func, arg, k=v, *args, **kwargs) | |
Пример AST представления "func(...)": | |
Call( | |
func=Name(id='fetch', ctx=Load()), | |
args=[Num(n=1)], | |
keywords=[keyword(arg='k', value=Num(n=2))], | |
starargs=Tuple(elts=[Num(n=3)], ctx=Load()), | |
kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)]))) | |
---- vvvvvvvvv ---- | |
Пример AST представления "gen.Task(func, ...)": | |
Call( | |
func=Attribute(value=Name(id='gen', ctx=Load()), | |
attr='Task', ctx=Load()), | |
args=[Name(id='fetch', ctx=Load()), Num(n=1)], | |
keywords=[keyword(arg='k', value=Num(n=2))], | |
starargs=Tuple(elts=[Num(n=3)], ctx=Load()), | |
kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)])) | |
""" | |
# Генерируем gen.Task | |
gen_task = ast.Attribute( | |
value=ast.Name(id='gen', ctx=ast.Load()), | |
attr='Task', ctx=ast.Load()) | |
# Генерируем вызов gen.Task(func, ...) | |
call = ast.Call( | |
func=gen_task, | |
# имя оригинальной ф-ции 1-м аргументом: | |
args=[func_call.func] + func_call.args, | |
keywords=func_call.keywords, | |
starargs=func_call.starargs, | |
kwargs=func_call.kwargs) | |
return self.visit(call) | |
def visit_Load(self, node): | |
# Заменяем Load() на Store() | |
if self.on_assign: | |
return ast.copy_location(ast.Store(), node) | |
return node | |
def shortgen(f): | |
raise RuntimeError("ERROR! file must be compiled with yield_ast!") | |
def compile_file(filepath): | |
path, filename = os.path.split(filepath) | |
with open(filepath) as src: | |
orig_ast = ast.parse(src.read()) | |
new_ast = RewriteGenTask().visit(orig_ast) | |
code = compile(new_ast, filename, 'exec') | |
pyc_filename = os.path.splitext(filename)[0] + '.pyc' | |
pyc_filepath = os.path.join(path, pyc_filename) | |
with open(pyc_filepath, 'wb') as fc: | |
fc.write(py_compile.MAGIC) | |
py_compile.wr_long(fc, long(time.time())) | |
marshal.dump(code, fc) | |
fc.flush() | |
if __name__ == '__main__': | |
import sys | |
if len(sys.argv) < 2: | |
print "Usage: %s file_to_compile1.py [file2.py] ..." % sys.argv[0] | |
for filename in sys.argv[1:]: | |
compile_file(filename) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment