Skip to content

Instantly share code, notes, and snippets.

@dhagrow
Last active October 30, 2024 01:44
Show Gist options
  • Save dhagrow/d3414e3c6ae25dfa606238355aea2ca5 to your computer and use it in GitHub Desktop.
Save dhagrow/d3414e3c6ae25dfa606238355aea2ca5 to your computer and use it in GitHub Desktop.
Dynamic function creation in Python
"""
Python is a dynamic language, and it is relatively easy to dynamically create
and modify things such as classes and objects. Functions, however, are quite
challenging to create dynamically.
One area where we might want to do this is in an RPC library, where a function
defined on a server needs to be available remotely on a client.
The naive solution is to simply pass arguments to a generic function that
accepts `*args` and `**kwargs`. A lot of information is lost with this approach,
however, in particular the number of arguments taken. Used in an RPC
implementation, this also delays any error feedback until after the arguments
have reached the server.
If you search online, most practical solutions involve `exec()`. This is
generally the approach chosen by many Python RPC libraries. This is, of course,
a very insecure solution, one that opens any program up to malicious code
execution.
This experiment creates a real function at the highest layer available: the AST.
There are several challenges to this approach. The most significant is that on
the AST layer, function arguments must be defined according to their type. This
greatly limits the flexibility allowed when defining a function with Python
syntax.
This experiment has a few requirements that introduce (and relieve) additional
challenges:
- Must return a representative function signature to the Python interpreter
- Must support both Python 2 and 3
- Must allow serialization to JSON and/or MsgPack
"""
from __future__ import print_function
import ast
import sys
import types
import numbers
import collections
PY3 = sys.version_info.major >= 3
def create_function(name, signature, callback):
"""Dynamically creates a function that wraps a call to *callback*, based
on the provided *signature*.
Note that only default arguments with a value of `None` are supported. Any
other value will raise a `TypeError`.
"""
# utils to set default values when creating a ast objects
Loc = lambda cls, **kw: cls(annotation=None, lineno=1, col_offset=0, **kw)
Name = lambda id, ctx=None: Loc(ast.Name, id=id, ctx=ctx or ast.Load())
# vars for the callback call
call_args = []
call_keywords = [] # PY3
call_starargs = None # PY2
call_kwargs = None # PY2
# vars for the generated function signature
func_args = []
func_defaults = []
vararg = None
kwarg = None
# vars for the args with default values
defaults = []
# assign args based on *signature*
for param in viewvalues(signature.parameters):
if param.default is not param.empty:
if isinstance(param.default, type(None)):
# `ast.NameConstant` is used in PY3, but both support `ast.Name`
func_defaults.append(Name('None'))
elif isinstance(param.default, bool):
# `ast.NameConstant` is used in PY3, but both support `ast.Name`
typ = str if PY3 else bytes
func_defaults.append(Name(typ(param.default)))
elif isinstance(param.default, numbers.Number):
func_defaults.append(Loc(ast.Num, n=param.default))
elif isinstance(param.default, str):
func_defaults.append(Loc(ast.Str, s=param.default))
elif isinstance(param.default, bytes):
typ = ast.Bytes if PY3 else ast.Str
func_defaults.append(Loc(typ, s=param.default))
elif isinstance(param.default, list):
func_defaults.append(Loc(ast.List,
elts=param.default, ctx=ast.Load()))
elif isinstance(param.default, tuple):
func_defaults.append(Loc(ast.Tuple,
elts=list(param.default), ctx=ast.Load()))
elif isinstance(param.default, dict):
func_defaults.append(Loc(ast.Dict,
keys=list(viewkeys(param.default)),
values=list(viewvalues(param.default))))
else:
err = 'unsupported default argument type: {}'
raise TypeError(err.format(type(param.default)))
defaults.append(param.default)
# func_defaults.append(Name('None'))
# defaults.append(None)
if param.kind in {param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD}:
call_args.append(Name(param.name))
if PY3:
func_args.append(Loc(ast.arg, arg=param.name))
else:
func_args.append(Name(param.name, ast.Param()))
elif param.kind == param.VAR_POSITIONAL:
if PY3:
call_args.append(Loc(ast.Starred,
value=Name(param.name),
ctx=ast.Load()))
vararg = Loc(ast.arg, arg=param.name)
else:
call_starargs = Name(param.name)
vararg = param.name
elif param.kind == param.KEYWORD_ONLY:
err = 'TODO: KEYWORD_ONLY param support, param: {}'
raise TypeError(err.format(param.name))
elif param.kind == param.VAR_KEYWORD:
if PY3:
call_keywords.append(Loc(ast.keyword,
arg=None, value=Name(param.name)))
kwarg = Loc(ast.arg, arg=param.name)
else:
call_kwargs = Name(param.name)
kwarg = param.name
# generate the ast for the *callback* call
call_ast = Loc(ast.Call,
func=Name(callback.__name__),
args=call_args, keywords=call_keywords,
starargs=call_starargs, kwargs=call_kwargs)
# generate the function ast
func_ast = Loc(ast.FunctionDef, name=to_func_name(name),
args=ast.arguments(
args=func_args, vararg=vararg, defaults=func_defaults,
kwarg=kwarg, kwonlyargs=[], kw_defaults=[]),
body=[Loc(ast.Return, value=call_ast)],
decorator_list=[], returns=None)
# compile the ast and get the function code
mod_ast = ast.Module(body=[func_ast])
module_code = compile(mod_ast, '<generated-ast>', 'exec')
func_code = [c for c in module_code.co_consts
if isinstance(c, types.CodeType)][0]
# return the generated function
return types.FunctionType(func_code, {callback.__name__: callback},
argdefs=tuple(defaults))
##
## support functions
##
def viewitems(obj):
return getattr(obj, "viewitems", obj.items)()
def viewkeys(obj):
return getattr(obj, "viewkeys", obj.keys)()
def viewvalues(obj):
return getattr(obj, "viewvalues", obj.values)()
def to_func_name(name):
# func.__name__ must be bytes in Python2
return to_unicode(name) if PY3 else to_bytes(name)
def to_bytes(s, encoding='utf8'):
if isinstance(s, bytes):
pass
elif isinstance(s, str):
s = s.encode(encoding)
return s
def to_unicode(s, encoding='utf8'):
if isinstance(s, bytes):
s = s.decode(encoding)
elif isinstance(s, str):
pass
elif isinstance(s, dict):
s = {to_unicode(k): to_unicode(v) for k, v in viewitems(s)}
elif isinstance(s, collections.Iterable):
s = [to_unicode(x, encoding) for x in s]
return s
##
## demo
##
def main():
if PY3:
from inspect import signature
else:
from funcsigs import signature
# original function
def original(a, b, *args, **kwargs):
return a, b, args, kwargs
sig = signature(original)
print('original:', original)
print('original signature:', sig)
print('original ret:', original(1, 2, 4, borp='torp'))
# cloned function
def callback(*args, **kwargs):
return args, kwargs
cloned = create_function('clone', sig, callback)
sig = signature(cloned)
print('cloned:', cloned)
print('cloned signature:', sig)
print('cloned ret:', cloned(1, 2, 4, borp='torp'))
if __name__ == '__main__':
main()
@shaunc
Copy link

shaunc commented Oct 30, 2024

For python 3.11, needs posonlyargs=[], in ast.arguments call, and type_ignores=[] in ast.Module call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment