-
-
Save dhagrow/d3414e3c6ae25dfa606238355aea2ca5 to your computer and use it in GitHub Desktop.
""" | |
Python is a dynamic language, and it is relatively easy to dynamically create | |
and modify things such as classes and objects. Functions, however, are quite | |
challenging to create dynamically. | |
One area where we might want to do this is in an RPC library, where a function | |
defined on a server needs to be available remotely on a client. | |
The naive solution is to simply pass arguments to a generic function that | |
accepts `*args` and `**kwargs`. A lot of information is lost with this approach, | |
however, in particular the number of arguments taken. Used in an RPC | |
implementation, this also delays any error feedback until after the arguments | |
have reached the server. | |
If you search online, most practical solutions involve `exec()`. This is | |
generally the approach chosen by many Python RPC libraries. This is, of course, | |
a very insecure solution, one that opens any program up to malicious code | |
execution. | |
This experiment creates a real function at the highest layer available: the AST. | |
There are several challenges to this approach. The most significant is that on | |
the AST layer, function arguments must be defined according to their type. This | |
greatly limits the flexibility allowed when defining a function with Python | |
syntax. | |
This experiment has a few requirements that introduce (and relieve) additional | |
challenges: | |
- Must return a representative function signature to the Python interpreter | |
- Must support both Python 2 and 3 | |
- Must allow serialization to JSON and/or MsgPack | |
""" | |
from __future__ import print_function | |
import ast | |
import sys | |
import types | |
import numbers | |
import collections | |
PY3 = sys.version_info.major >= 3 | |
def create_function(name, signature, callback): | |
"""Dynamically creates a function that wraps a call to *callback*, based | |
on the provided *signature*. | |
Note that only default arguments with a value of `None` are supported. Any | |
other value will raise a `TypeError`. | |
""" | |
# utils to set default values when creating a ast objects | |
Loc = lambda cls, **kw: cls(annotation=None, lineno=1, col_offset=0, **kw) | |
Name = lambda id, ctx=None: Loc(ast.Name, id=id, ctx=ctx or ast.Load()) | |
# vars for the callback call | |
call_args = [] | |
call_keywords = [] # PY3 | |
call_starargs = None # PY2 | |
call_kwargs = None # PY2 | |
# vars for the generated function signature | |
func_args = [] | |
func_defaults = [] | |
vararg = None | |
kwarg = None | |
# vars for the args with default values | |
defaults = [] | |
# assign args based on *signature* | |
for param in viewvalues(signature.parameters): | |
if param.default is not param.empty: | |
if isinstance(param.default, type(None)): | |
# `ast.NameConstant` is used in PY3, but both support `ast.Name` | |
func_defaults.append(Name('None')) | |
elif isinstance(param.default, bool): | |
# `ast.NameConstant` is used in PY3, but both support `ast.Name` | |
typ = str if PY3 else bytes | |
func_defaults.append(Name(typ(param.default))) | |
elif isinstance(param.default, numbers.Number): | |
func_defaults.append(Loc(ast.Num, n=param.default)) | |
elif isinstance(param.default, str): | |
func_defaults.append(Loc(ast.Str, s=param.default)) | |
elif isinstance(param.default, bytes): | |
typ = ast.Bytes if PY3 else ast.Str | |
func_defaults.append(Loc(typ, s=param.default)) | |
elif isinstance(param.default, list): | |
func_defaults.append(Loc(ast.List, | |
elts=param.default, ctx=ast.Load())) | |
elif isinstance(param.default, tuple): | |
func_defaults.append(Loc(ast.Tuple, | |
elts=list(param.default), ctx=ast.Load())) | |
elif isinstance(param.default, dict): | |
func_defaults.append(Loc(ast.Dict, | |
keys=list(viewkeys(param.default)), | |
values=list(viewvalues(param.default)))) | |
else: | |
err = 'unsupported default argument type: {}' | |
raise TypeError(err.format(type(param.default))) | |
defaults.append(param.default) | |
# func_defaults.append(Name('None')) | |
# defaults.append(None) | |
if param.kind in {param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD}: | |
call_args.append(Name(param.name)) | |
if PY3: | |
func_args.append(Loc(ast.arg, arg=param.name)) | |
else: | |
func_args.append(Name(param.name, ast.Param())) | |
elif param.kind == param.VAR_POSITIONAL: | |
if PY3: | |
call_args.append(Loc(ast.Starred, | |
value=Name(param.name), | |
ctx=ast.Load())) | |
vararg = Loc(ast.arg, arg=param.name) | |
else: | |
call_starargs = Name(param.name) | |
vararg = param.name | |
elif param.kind == param.KEYWORD_ONLY: | |
err = 'TODO: KEYWORD_ONLY param support, param: {}' | |
raise TypeError(err.format(param.name)) | |
elif param.kind == param.VAR_KEYWORD: | |
if PY3: | |
call_keywords.append(Loc(ast.keyword, | |
arg=None, value=Name(param.name))) | |
kwarg = Loc(ast.arg, arg=param.name) | |
else: | |
call_kwargs = Name(param.name) | |
kwarg = param.name | |
# generate the ast for the *callback* call | |
call_ast = Loc(ast.Call, | |
func=Name(callback.__name__), | |
args=call_args, keywords=call_keywords, | |
starargs=call_starargs, kwargs=call_kwargs) | |
# generate the function ast | |
func_ast = Loc(ast.FunctionDef, name=to_func_name(name), | |
args=ast.arguments( | |
args=func_args, vararg=vararg, defaults=func_defaults, | |
kwarg=kwarg, kwonlyargs=[], kw_defaults=[]), | |
body=[Loc(ast.Return, value=call_ast)], | |
decorator_list=[], returns=None) | |
# compile the ast and get the function code | |
mod_ast = ast.Module(body=[func_ast]) | |
module_code = compile(mod_ast, '<generated-ast>', 'exec') | |
func_code = [c for c in module_code.co_consts | |
if isinstance(c, types.CodeType)][0] | |
# return the generated function | |
return types.FunctionType(func_code, {callback.__name__: callback}, | |
argdefs=tuple(defaults)) | |
## | |
## support functions | |
## | |
def viewitems(obj): | |
return getattr(obj, "viewitems", obj.items)() | |
def viewkeys(obj): | |
return getattr(obj, "viewkeys", obj.keys)() | |
def viewvalues(obj): | |
return getattr(obj, "viewvalues", obj.values)() | |
def to_func_name(name): | |
# func.__name__ must be bytes in Python2 | |
return to_unicode(name) if PY3 else to_bytes(name) | |
def to_bytes(s, encoding='utf8'): | |
if isinstance(s, bytes): | |
pass | |
elif isinstance(s, str): | |
s = s.encode(encoding) | |
return s | |
def to_unicode(s, encoding='utf8'): | |
if isinstance(s, bytes): | |
s = s.decode(encoding) | |
elif isinstance(s, str): | |
pass | |
elif isinstance(s, dict): | |
s = {to_unicode(k): to_unicode(v) for k, v in viewitems(s)} | |
elif isinstance(s, collections.Iterable): | |
s = [to_unicode(x, encoding) for x in s] | |
return s | |
## | |
## demo | |
## | |
def main(): | |
if PY3: | |
from inspect import signature | |
else: | |
from funcsigs import signature | |
# original function | |
def original(a, b, *args, **kwargs): | |
return a, b, args, kwargs | |
sig = signature(original) | |
print('original:', original) | |
print('original signature:', sig) | |
print('original ret:', original(1, 2, 4, borp='torp')) | |
# cloned function | |
def callback(*args, **kwargs): | |
return args, kwargs | |
cloned = create_function('clone', sig, callback) | |
sig = signature(cloned) | |
print('cloned:', cloned) | |
print('cloned signature:', sig) | |
print('cloned ret:', cloned(1, 2, 4, borp='torp')) | |
if __name__ == '__main__': | |
main() |
Thanks for this! That was very helpful!
I've added keyword-only support (python 3 only) here: https://gist.github.com/dsuess/1a5919b598f54d24010eb0a7a79e71a0
hello @dhagrow I wonder what is the license of this code in case I would want to use it.
For the record, I ultimately relented and opted for an exec()
-based solution for my needs, which I believe can be implemented securely. I found that this dynamic version required too much maintenance, because the AST was changing even between minor versions of Python.
thanks! In the end I didn't end-up needing it. But I still found it very educational 👍
btw in my case I was just doing terrible hack so robustness sort of went out of window from get go. You can see it here https://github.com/turboMaCk/pytriarchy
For python 3.11, needs posonlyargs=[],
in ast.arguments
call, and type_ignores=[]
in ast.Module
call.
Output of the script ...
Python 2:
Note that the funcsigs package is required for Python 2.
Python 3: