-
-
Save dsuess/1a5919b598f54d24010eb0a7a79e71a0 to your computer and use it in GitHub Desktop.
Dynamic function creation in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Python is a dynamic language, and it is relatively easy to dynamically create | |
and modify things such as classes and objects. Functions, however, are quite | |
challenging to create dynamically. | |
One area where we might want to do this is in an RPC library, where a function | |
defined on a server needs to be available remotely on a client. | |
The naive solution is to simply pass arguments to a generic function that | |
accepts `*args` and `**kwargs`. A lot of information is lost with this approach, | |
however, in particular the number of arguments taken. Used in an RPC | |
implementation, this also delays any error feedback until after the arguments | |
have reached the server. | |
If you search online, most practical solutions involve `exec()`. This is | |
generally the approach chosen by many Python RPC libraries. This is, of course, | |
a very insecure solution, one that opens any program up to malicious code | |
execution. | |
This experiment creates a real function at the highest layer available: the AST. | |
There are several challenges to this approach. The most significant is that on | |
the AST layer, function arguments must be defined according to their type. This | |
greatly limits the flexibility allowed when defining a function with Python | |
syntax. | |
This experiment has a few requirements that introduce (and relieve) additional | |
challenges: | |
- Must return a representative function signature to the Python interpreter | |
- Must support both Python 2 and 3 | |
- Must allow serialization to JSON and/or MsgPack | |
Taken from https://gist.github.com/dhagrow/d3414e3c6ae25dfa606238355aea2ca5 | |
""" | |
from __future__ import print_function | |
import ast | |
import types | |
import numbers | |
import collections | |
def create_function(name, signature, callback): | |
"""Dynamically creates a function that wraps a call to *callback*, based | |
on the provided *signature*. | |
""" | |
# utils to set default values when creating a ast objects | |
Loc = lambda cls, **kw: cls(annotation=None, lineno=1, col_offset=0, **kw) | |
Name = lambda id, ctx=None: Loc(ast.Name, id=id, ctx=ctx or ast.Load()) | |
# vars for the callback call | |
call_args = [] | |
call_keywords = [] | |
# vars for the generated function signature | |
func_args = [] | |
func_kwargs = [] | |
func_defaults = [] | |
func_kwdefaults = [] | |
vararg = None | |
kwarg = None | |
# vars for the args with default values | |
defaults = [] | |
kwdefaults = dict() | |
# assign args based on *signature* | |
for param in viewvalues(signature.parameters): | |
if param.default is not param.empty: | |
if param.kind in {param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD}: | |
add_to = func_defaults | |
defaults.append(param.default) | |
elif param.kind is param.KEYWORD_ONLY: | |
add_to = func_kwdefaults | |
kwdefaults[param.name] = param.default | |
else: | |
raise TypeError("Shouldnt have defaults for other types") | |
if isinstance(param.default, type(None)): | |
# `ast.NameConstant` is used in PY3, but both support `ast.Name` | |
add_to.append(Name("None")) | |
elif isinstance(param.default, bool): | |
# `ast.NameConstant` is used in PY3, but both support `ast.Name` | |
add_to.append(Name(str(param.default))) | |
elif isinstance(param.default, numbers.Number): | |
add_to.append(Loc(ast.Num, n=param.default)) | |
elif isinstance(param.default, str): | |
add_to.append(Loc(ast.Str, s=param.default)) | |
elif isinstance(param.default, bytes): | |
add_to.append(Loc(ast.Bytes, s=param.default)) | |
elif isinstance(param.default, list): | |
add_to.append(Loc(ast.List, elts=param.default, ctx=ast.Load())) | |
elif isinstance(param.default, tuple): | |
add_to.append(Loc(ast.Tuple, elts=list(param.default), ctx=ast.Load())) | |
elif isinstance(param.default, dict): | |
add_to.append( | |
Loc( | |
ast.Dict, | |
keys=list(viewkeys(param.default)), | |
values=list(viewvalues(param.default)), | |
) | |
) | |
else: | |
err = "unsupported default argument type: {}" | |
raise TypeError(err.format(type(param.default))) | |
elif param.kind is param.KEYWORD_ONLY: | |
# If it's a keyword-only arugment, we need to add a None-default | |
# value | |
func_kwdefaults.append(None) | |
if param.kind in {param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD}: | |
call_args.append(Name(param.name)) | |
func_args.append(Loc(ast.arg, arg=param.name)) | |
elif param.kind == param.VAR_POSITIONAL: | |
call_args.append(Loc(ast.Starred, value=Name(param.name), ctx=ast.Load())) | |
vararg = Loc(ast.arg, arg=param.name) | |
elif param.kind == param.KEYWORD_ONLY: | |
call_keywords.append( | |
Loc(ast.keyword, arg=param.name, value=Name(param.name)) | |
) | |
func_kwargs.append(Loc(ast.arg, arg=param.name)) | |
elif param.kind == param.VAR_KEYWORD: | |
call_keywords.append(Loc(ast.keyword, arg=None, value=Name(param.name))) | |
kwarg = Loc(ast.arg, arg=param.name) | |
# generate the ast for the *callback* call | |
call_ast = Loc( | |
ast.Call, func=Name(callback.__name__), args=call_args, keywords=call_keywords | |
) | |
# generate the function ast | |
func_ast = Loc( | |
ast.FunctionDef, | |
name=to_func_name(name), | |
args=ast.arguments( | |
args=func_args, | |
vararg=vararg, | |
defaults=func_defaults, | |
kwarg=kwarg, | |
kwonlyargs=func_kwargs, | |
kw_defaults=func_kwdefaults, | |
), | |
body=[Loc(ast.Return, value=call_ast)], | |
decorator_list=[], | |
returns=None, | |
) | |
# compile the ast and get the function code | |
mod_ast = ast.Module(body=[func_ast]) | |
module_code = compile(mod_ast, "<generated-ast>", "exec") | |
func_code = [c for c in module_code.co_consts if isinstance(c, types.CodeType)][0] | |
# return the generated function | |
func = types.FunctionType( | |
func_code, {callback.__name__: callback}, argdefs=tuple(defaults) | |
) | |
func.__kwdefaults__ = kwdefaults | |
return func | |
## | |
## support functions | |
## | |
def viewitems(obj): | |
return getattr(obj, "viewitems", obj.items)() | |
def viewkeys(obj): | |
return getattr(obj, "viewkeys", obj.keys)() | |
def viewvalues(obj): | |
return getattr(obj, "viewvalues", obj.values)() | |
def to_func_name(name): | |
# func.__name__ must be bytes in Python2 | |
return to_unicode(name) | |
def to_bytes(s, encoding="utf8"): | |
if isinstance(s, bytes): | |
pass | |
elif isinstance(s, str): | |
s = s.encode(encoding) | |
return s | |
def to_unicode(s, encoding="utf8"): | |
if isinstance(s, bytes): | |
s = s.decode(encoding) | |
elif isinstance(s, str): | |
pass | |
elif isinstance(s, dict): | |
s = {to_unicode(k): to_unicode(v) for k, v in viewitems(s)} | |
elif isinstance(s, collections.Iterable): | |
s = [to_unicode(x, encoding) for x in s] | |
return s | |
def main(): | |
from inspect import signature | |
# original function | |
def original(a, b, *args, c, d=10, **kwargs): | |
return a, b, args, c, kwargs | |
sig = signature(original) | |
print("original:", original) | |
print("original signature:", sig) | |
print("original ret:", original(1, 2, 4, c=5, borp="torp")) | |
# cloned function | |
def callback(*args, **kwargs): | |
return args, kwargs | |
cloned = create_function("clone", sig, callback) | |
sig = signature(cloned) | |
print("cloned:", cloned) | |
print("cloned signature:", sig) | |
print("cloned ret:", cloned(1, 2, 4, c=5, borp="torp")) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Prints out: