Last active
June 6, 2017 04:09
-
-
Save yabberyabber/6ca32b2e6e291a40e927bc408813c936 to your computer and use it in GitHub Desktop.
Python macros. Transform some valid python code into other valid python code using a set of user-defined transformations.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
./e_apply_macros.py b_example_macros.py c_input_file.py > d_output_file.py |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Transforms are of the form | |
Transform(symbols, pattern, template) | |
Find things in the code that look like |pattern| and replace them | |
with code that looks like |template|. | |
""" | |
[ | |
Transform({'itr': 'identifier', 'src': 'identifier'}, | |
'[urllib.request.urlopen(itr).read() for itr in src]', | |
"""[x.get() for x in | |
[lazy_call(lambda y: urllib.request.urlopen(y).read(), (itr, )) | |
for itr in src]]"""), | |
Transform({'condition': 'any', 'ifcase': 'any', 'elsecase': 'any'}, | |
'cond(condition, ifcase, elsecase)', | |
'(ifcase if condition else elsecase)') | |
] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib.request | |
from z_lazy import lazy_call | |
urls = [ | |
'https://en.wikipedia.org/wiki/Main_Page', | |
'https://en.wikipedia.org/wiki/John_Sherman', | |
'https://en.wikipedia.org/wiki/Republican_Party_(United_States)', | |
'https://en.wikipedia.org/wiki/Assassination_of_Abraham_Lincoln', | |
'https://en.wikipedia.org/wiki/Andrew_Johnson', | |
'https://en.wikipedia.org/wiki/Governor_of_Tennessee', | |
'https://en.wikipedia.org/wiki/Tennessee', | |
'https://en.wikipedia.org/wiki/Cherokee_language', | |
'https://en.wikipedia.org/wiki/Polysynthetic_language', | |
'https://en.wikipedia.org/wiki/Imperial_Japanese_Navy', | |
'https://en.wikipedia.org/wiki/Empire_of_Japan', | |
'https://en.wikipedia.org/wiki/Japanese_colonial_empire', | |
'https://en.wikipedia.org/wiki/Manchuria', | |
'https://en.wikipedia.org/wiki/Japanese_people' | |
] | |
x = 0 | |
def incX(): | |
global x | |
x = x + 1 | |
return x | |
content = [urllib.request.urlopen(url).read() for url in urls] | |
lowered = [html.lower() for html in content] | |
inc = [incX() for z in urls] | |
content_b = [urllib.request.urlopen(url2).read() for url2 in urls] | |
def do_something(): | |
print("boo!") | |
def do_somethingelse(): | |
print("yay!") | |
a = cond(False, do_something(), do_somethingelse()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib.request | |
from z_lazy import lazy_call | |
urls = [ | |
'https://en.wikipedia.org/wiki/Main_Page', | |
'https://en.wikipedia.org/wiki/John_Sherman', | |
'https://en.wikipedia.org/wiki/Republican_Party_(United_States)', | |
'https://en.wikipedia.org/wiki/Assassination_of_Abraham_Lincoln', | |
'https://en.wikipedia.org/wiki/Andrew_Johnson', | |
'https://en.wikipedia.org/wiki/Governor_of_Tennessee', | |
'https://en.wikipedia.org/wiki/Tennessee', | |
'https://en.wikipedia.org/wiki/Cherokee_language', | |
'https://en.wikipedia.org/wiki/Polysynthetic_language', | |
'https://en.wikipedia.org/wiki/Imperial_Japanese_Navy', | |
'https://en.wikipedia.org/wiki/Empire_of_Japan', | |
'https://en.wikipedia.org/wiki/Japanese_colonial_empire', | |
'https://en.wikipedia.org/wiki/Manchuria', | |
'https://en.wikipedia.org/wiki/Japanese_people' | |
] | |
x = 0 | |
def incX(): | |
global x | |
x = x + 1 | |
return x | |
content = [x.get() for x in | |
[lazy_call(lambda y: urllib.request.urlopen(y).read(), (url, )) | |
for url in urls]] | |
lowered = [html.lower() for html in content] | |
inc = [incX() for z in urls] | |
content_b = [x.get() for x in | |
[lazy_call(lambda y: urllib.request.urlopen(y).read(), (url2, )) | |
for url2 in urls]] | |
def do_something(): | |
print("boo!") | |
def do_somethingelse(): | |
print("yay!") | |
a = (do_something() if False else do_somethingelse()) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from redbaron import RedBaron | |
from f_dominos import Transform | |
def do_conversion(macro_def_fname, input_fname): | |
with open(macro_def_fname, 'r') as macro_def_file: | |
with open(input_fname, 'r') as input_file: | |
ast = RedBaron(input_file.read()) | |
macro_def = eval(macro_def_file.read(), {"Transform": Transform}) | |
for macro in macro_def: | |
ast = macro.apply(ast) | |
return ast.dumps() | |
if __name__ == '__main__': | |
if len(sys.argv) == 3: | |
print(do_conversion(sys.argv[1], sys.argv[2])) | |
else: | |
print("Usage: ./macro.py macro_description_file.py in_file.py") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Generic macro system | |
Built on RedBaron but it's delivery | |
Define a macro like | |
transform = Transform( | |
{'itr': 'identifier', 'src': 'identifier', 'oops': 'any'}, | |
'[oops for itr in src]', | |
"[x.get() for x in | |
[lazy_call(lambda y: urllib.request.urlopen(y), (itr, )) | |
for itr in src]]") | |
""" | |
import unittest | |
from redbaron import RedBaron | |
def node_type(thing): | |
""" | |
Return a concise string representation of the type of this | |
(e.g. node_type(redbaron.RedBaron.NameNode) == "NameNode" | |
""" | |
return str(type(thing)).split("'")[1].split('.')[-1] | |
def join_symbols(symbolss): | |
""" | |
Join a list of symbol tables into one symbol table if possible. If one | |
symbol exists in multiple tables, throw an exception unless they are | |
all equal. | |
""" | |
ret = {} | |
for symbols in symbolss: | |
for key, val in symbols.items(): | |
if key in ret: | |
if ret[key] == val: | |
pass | |
else: | |
raise SymbolMismatchException() | |
else: | |
ret[key] = val | |
return ret | |
class NoMatchException(Exception): | |
""" | |
Exception to be raised by resolve_symbols if the two sides are not | |
a match | |
""" | |
pass | |
def _print(*msg): | |
pass | |
class SymbolMismatchException(Exception): | |
""" | |
Exception to be raised by resolve_symbols if there are multiple conflicting | |
defiitions of a symbol. | |
""" | |
pass | |
def resolve_symbols(symbols, a, b): | |
""" | |
Check that |a| and |b| are matching expressions and build a dictionary | |
of symbol resolutions. | |
|symbols| is a mapping from symbol names to their types | |
@returns a mapping from symbol names to their values | |
""" | |
_print("matching ", node_type(a), ": \n\t", a, "\n\t", b) | |
if node_type(b) == 'NameNode' and b.value in symbols: | |
_print("Found something in symbols!", node_type(a)) | |
if symbols[str(b)] == 'any': | |
return {str(b): str(a)} | |
elif symbols[str(b)] == 'identifier' and node_type(a) == 'NameNode': | |
return {str(b): str(a)} | |
else: | |
raise NoMatchException() | |
if type(a) != type(b): | |
_print("type diff error: %s != %s" % | |
(str(type(a)), str(type(b)))) | |
raise NoMatchException() | |
if (node_type(a) == "NodeList" or | |
node_type(a) == 'DotProxyList' or | |
node_type(a) == 'CommaProxyList'): | |
return join_symbols([resolve_symbols(symbols, s_a, s_b) | |
for (s_a, s_b) in zip(a, b)]) | |
elif node_type(a) == 'NameNode': | |
if str(a) == str(b): | |
return {} | |
else: | |
raise NoMatchException() | |
elif node_type(a) == 'DotNode': | |
return {} | |
elif node_type(a) == 'CallNode': | |
return resolve_symbols(symbols, a.value, b.value) | |
elif node_type(a) == "ComprehensionLoopNode": | |
return join_symbols([ | |
resolve_symbols(symbols, a.iterator, b.iterator), | |
resolve_symbols(symbols, a.target, b.target), | |
resolve_symbols(symbols, a.ifs, b.ifs)]) | |
elif node_type(a) == "ListComprehensionNode": | |
a = join_symbols([ | |
resolve_symbols(symbols, a.result, b.result), | |
resolve_symbols(symbols, a.generators, b.generators)]) | |
_print(a) | |
return a | |
elif node_type(a) == "AtomtrailersNode": | |
a = join_symbols([resolve_symbols(symbols, a.value, b.value)]) | |
_print(a) | |
return a | |
elif node_type(a) == 'CallArgumentNode': | |
a = join_symbols([ | |
resolve_symbols(symbols, a.target, b.target), | |
resolve_symbols(symbols, a.value, b.value)]) | |
_print(a) | |
return a | |
elif a is None and b is None: | |
a = {} | |
_print(a) | |
return a | |
_print("unhandled case:") | |
_print(node_type(a)) | |
_print(node_type(b)) | |
if a: | |
#a.help() | |
pass | |
if b: | |
#b.help() | |
pass | |
raise NoMatchException() | |
def apply_transform(symbols, template): | |
""" | |
Return |out_pattern| with the symbols in |out_pattern| replaced with | |
the corresponding symbol in |symbols| | |
""" | |
template = template.copy() | |
for s_name, s_val in symbols.items(): | |
for match in template.find_all('NameNode', value=s_name): | |
match.replace(s_val) | |
_print("replacing ", s_name, "with", s_val) | |
_print(template) | |
return template[0] | |
class Transform(object): | |
def __init__(self, symbols, in_pattern, out_pattern): | |
""" | |
Let Symbols be a dictionary mapping identifiers to expression | |
types which don't have to be matched exactly. In_pattern is a | |
string that is some python code. out_pattern is a string that is | |
some other python code. | |
""" | |
self.symbols = symbols | |
self.in_pattern = in_pattern | |
self.in_pat_expr = RedBaron(in_pattern)[0] | |
self.template = RedBaron(out_pattern) | |
def find_matches(self, ast): | |
""" | |
Find all the bits in |ast| that match |in_pattern| | |
""" | |
outer_type = str(type(self.in_pat_expr)).split("'")[1].split('.')[-1] | |
for top_node in ast.find_all(outer_type): | |
_print("found top node", top_node) | |
try: | |
symbols = resolve_symbols(self.symbols, top_node, self.in_pat_expr) | |
yield (symbols, top_node) | |
except (NoMatchException, SymbolMismatchException): | |
pass | |
def apply(self, ast): | |
res = ast.copy() | |
for match in self.find_matches(res): | |
transformed = apply_transform(match[0], self.template) | |
match[1].replace(transformed) | |
return res | |
test_input = """ | |
import urllib.request | |
urls = [] | |
content = [urllib.request.urlopen(url).read() for url in urls] | |
lowered = [html.lower() for html in content] | |
urls = oops | |
inc = [incX() for z in urls] | |
content_b = [urllib.request.urlopen(url2).read() for url2 in urls] | |
a = cond(False, do_something(), do_somethingelse()) | |
""" | |
class TestStuff(unittest.TestCase): | |
def test_resolve_symbols(self): | |
ast = RedBaron(test_input) | |
src = RedBaron('cond(condition, ifcase, elsecase)') | |
dst = RedBaron('(ifcase if condition else elsecase)') | |
symbols = {'condition': 'any', 'ifcase': 'any', 'elsecase': 'any'} | |
transform = Transform(symbols, src, dst) | |
ret = list(transform.find_matches(ast)) | |
assert len(ret) == 1 | |
assert ret[0][0] == ({'condition': 'False', | |
'ifcase': 'do_something()', | |
'elsecase': 'do_somethingelse()'}) | |
assert str(ret[0][1]) == "cond(False, do_something(), do_somethingelse())" | |
def test_replace_symbols(self): | |
pattern = RedBaron('(ifcase if condition else elsecase)') | |
symbols = {'condition': 'False', | |
'ifcase': 'do_something()', | |
'elsecase': 'do_somethingelse()'} | |
res = apply_transform(symbols, pattern) | |
assert str(res) == "(do_something() if False else do_somethingelse())" | |
def test_find_matches_simple(self): | |
ast = RedBaron(test_input) | |
transform = Transform( | |
{'itr': 'identifier', 'src': 'identifier', 'oops': 'any'}, | |
'[oops for itr in src]', | |
""" | |
[x.get() for x in | |
[lazy_call(lambda y: urllib.request.urlopen(y), (itr, )) | |
for itr in src]]""") | |
ret = list(transform.find_matches(ast)) | |
self.assertEqual(len(ret), 4) | |
def test_find_matches(self): | |
ast = RedBaron(test_input) | |
transform = Transform( | |
{'itr': 'identifier', 'src': 'identifier'}, | |
'[urllib.request.urlopen(itr).read() for itr in src]', | |
""" | |
[x.get() for x in | |
[lazy_call(lambda y: urllib.request.urlopen(y), (itr, )) | |
for itr in src]]""") | |
ret = list(transform.find_matches(ast)) | |
self.assertEqual(len(ret), 2) | |
assert str(ret[0][1]) == '[urllib.request.urlopen(url).read() for url in urls]' | |
def test_join_symbols(self): | |
assert(join_symbols([{'a': 3, 'b': 2}, {'a': 3, 'c': 1}]) == | |
{'a': 3, 'b': 2, 'c': 1}) | |
assert(join_symbols([{'a': 3, 'b': 2}]) == | |
{'a': 3, 'b': 2}) | |
try: | |
join_symbols([{'a': 3, 'b': 2}, {'a': 2}]) | |
assert False | |
except SymbolMismatchException: | |
assert True | |
def test_urllib_macro(self): | |
ast = RedBaron(test_input) | |
symbols = {'itr': 'identifier', 'src': 'identifier'} | |
pattern = '[urllib.request.urlopen(itr).read() for itr in src]' | |
template = RedBaron("""[x.get() for x in | |
[lazy_call(lambda y: urllib.request.urlopen(y), (itr, )) | |
for itr in src]]""") | |
transform = Transform(symbols, pattern, template) | |
res = transform.apply(ast) | |
self.maxDiff = 9999999 | |
expectation = RedBaron(""" | |
import urllib.request | |
urls = [] | |
content = [x.get() for x in | |
[lazy_call(lambda y: urllib.request.urlopen(y), (url, )) | |
for url in urls]] | |
lowered = [html.lower() for html in content] | |
urls = oops | |
inc = [incX() for z in urls] | |
content_b = [x.get() for x in | |
[lazy_call(lambda y: urllib.request.urlopen(y), (url2, )) | |
for url2 in urls]] | |
a = cond(False, do_something(), do_somethingelse()) | |
""").copy() | |
self.assertEqual(str(res), str(expectation)) | |
def test_cond_macro(self): | |
ast = RedBaron(test_input) | |
symbols = {'condition': 'any', 'ifcase': 'any', 'elsecase': 'any'} | |
pattern = 'cond(condition, ifcase, elsecase)' | |
template = RedBaron('(ifcase if condition else elsecase)') | |
transform = Transform(symbols, pattern, template) | |
res = transform.apply(ast) | |
self.maxDiff = 9999999 | |
expectation = RedBaron(""" | |
import urllib.request | |
urls = [] | |
content = [urllib.request.urlopen(url).read() for url in urls] | |
lowered = [html.lower() for html in content] | |
urls = oops | |
inc = [incX() for z in urls] | |
content_b = [urllib.request.urlopen(url2).read() for url2 in urls] | |
a = (do_something() if False else do_somethingelse()) | |
""").copy() | |
self.assertEqual(str(res), str(expectation)) | |
if __name__ == '__main__': | |
unittest.main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Helper library for running something in a separate process and blocking-on-wait | |
""" | |
import multiprocessing as mp | |
def _subprocess(out_pipe, func, params): | |
""" | |
Execute the given function and send the result through the given | |
unix pipe. | |
""" | |
ret_val = func(*params) | |
out_pipe.send(ret_val) | |
class _lazyFunc(object): | |
""" | |
executes |func| in its own process. |func| does not block until the | |
value is needed (hopefully |func| is already done by that point though) | |
""" | |
def __init__(self, func, args): | |
""" | |
Hopefulle the result of calling _subprocess is something that can | |
be sent through a unix pipe O: | |
""" | |
self.recv_pipe, self.send_pipe = mp.Pipe(False) | |
self.ret_val = None | |
self.finished = False | |
self.p = mp.Process(target=_subprocess, | |
args=(self.send_pipe, func, args)) | |
self.p.start() | |
def get(self): | |
""" | |
Block the process on the pipe rather than doing a busywait | |
(free up cpu to work on subprocess) | |
""" | |
if not self.finished: | |
self.ret_val = self.recv_pipe.recv() | |
self.finished = True | |
return self.ret_val | |
def __del__(self): | |
""" | |
If this object goes out of scope (or is otherwise deleted), kill | |
subprocess so the program can exit properly. | |
""" | |
self.p.terminate() | |
def lazy_call(func, args): | |
return _lazyFunc(func, args) | |
def parallel_map(func, data): | |
return [x.get() for x in | |
[lazy_call(func, (args, )) for args in data]] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment