Last active
December 26, 2022 21:43
-
-
Save arizvisa/ea757b9d2b3926e73262948756ac7b2e to your computer and use it in GitHub Desktop.
constraints with some fixes and some better scoring for performance testing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class multicase(object): | |
""" | |
A lot of magic is in this class which allows one to define multiple cases | |
for a single function. | |
""" | |
CO_OPTIMIZED = 0x00001 | |
CO_NEWLOCALS = 0x00002 | |
CO_VARARGS = 0x00004 | |
CO_VARKEYWORDS = 0x00008 | |
CO_NESTED = 0x00010 | |
CO_VARGEN = 0x00020 | |
CO_NOFREE = 0x00040 | |
CO_COROUTINE = 0x00080 | |
CO_ITERABLE = 0x00100 | |
CO_GENERATOR_ALLOWED = 0x01000 | |
CO_FUTURE_DIVISION = 0x02000 | |
CO_FUTURE_ABSOLUTE_IMPORT = 0x04000 | |
CO_FUTURE_WITH_STATEMENT = 0x08000 | |
CO_FUTURE_PRINT_FUNCTION = 0x10000 | |
CO_FUTURE_UNICODE_LITERALS = 0x20000 | |
CO_FUTURE_BARRY_AS_BDFL = 0x40000 | |
CO_FUTURE_GENERATOR_STOP = 0x80000 | |
cache_name = '__multicase_cache__' | |
documentation_name = '__multicase_documentation__' | |
def __new__(cls, *other, **t_args): | |
'''Decorate a case of a function with the specified types.''' | |
# Output some useful information to help the user if there's nothing that satisfies their parameters. | |
def missing(packed_parameters, tree, table, documentation={}, ignored_parameter_count=0): | |
'''Output the candidates for the callable that the user is attempting to use.''' | |
args, kwds = packed_parameters | |
Fcallable, Fhelp, Fprototype = "`{:s}`".format, "`help({:s})`".format, "{:s}".format | |
# Basic configuration | |
arrow, indent = ' -> ', 4 | |
# Some useful utilities for speaking english at our users. | |
Fconjunction_or = lambda items: ', '.join(items[:-1]) + ", or {:s}".format(*items[-1:]) if len(items) > 1 else items[0] | |
Fconjunction_and = lambda items: ', '.join(items[:-1]) + ", and {:s}".format(*items[-1:]) if len(items) > 1 else items[0] | |
sorted_functions = [F for F in cls.sorted_documentation(documentation)] | |
# Collect all parameter names, keywords, and documentation for describing what happened. | |
description_arguments = [item.__class__.__name__ for item in args[ignored_parameter_count:]] | |
description_keywords = ["{:s}={!s}".format(name, kwds[name].__class__.__name__) for name in kwds] | |
iterable = ((F, documentation[F]) for F in sorted_functions) | |
description_functions = {F : (prototype, F.__module__ if hasattr(F, '__module__') else None, pycompat.code.name(pycompat.function.code(F))) for F, (prototype, _, _) in iterable} | |
# Build the error message that is displayed as part of the exception. | |
available_names = sorted({'.'.join([module, name]) if module else name for _, (_, module, name) in description_functions.items()}) | |
conjunctioned = Fconjunction_and([Fcallable(item) for item in available_names]) | |
description = Fconjunction_or(["{:s}({:s})".format(name, ', '.join(itertools.chain(description_arguments, description_keywords))) for name in available_names]) | |
message = u"{:s}: The given parameter type{:s} not match any of the available cases for the definition of {:s}.".format(description, ' does' if sum(map(len, [description_arguments, description_keywords])) == 1 else 's do', conjunctioned) | |
# Build the list of candidates that the user will need to choose from. | |
listing_message = "The functions that are available for {:s} are:".format(Fconjunction_and([Fhelp(item) for item in available_names])) | |
iterable = ((F, description_functions[F]) for F in sorted_functions) | |
prototypes = [(Fprototype('.'.join([module, item]) if module else item if module else item), documentation[F]) for F, (item, module, _) in iterable] | |
# Calculate some lengths and use them to format our output in some meaningful way. | |
maximum = max(len(prototype) for prototype, _ in prototypes) if prototypes else 0 | |
components = [(prototype, "{:s}{:s}".format(arrow, lines[0]) if len(lines) else '') for prototype, (_, lines, _) in prototypes] | |
iterable = ("{: <{:d}s}{:s}".format(prototype, maximum, description) for prototype, description in components) | |
listing = ["{: <{:d}s}{:s}".format('', indent, item) for item in iterable] | |
# Now we have a message and a listing that we can just join together with a newline. | |
raise internal.exceptions.UnknownPrototypeError('\n'.join(itertools.chain([message, '', listing_message], listing))) | |
# These are just utility closures that can be attached to the entrypoint closure. | |
def fetch_score(cache, *arguments, **keywords): | |
'''Return a list of all the callables, their (transformed) parameters, and their score for the given `arguments` and `keywords`.''' | |
tree, table = cache | |
packed_parameters = arguments, keywords | |
candidates = cls.match(packed_parameters, tree, table) | |
scores = {F : cls.critique_and_transform(F, packed_parameters, tree, table) for F, _ in candidates} | |
iterable = cls.preordered(packed_parameters, candidates, tree, table) | |
ordered = ((F, scores[F]) for F, _ in iterable) | |
return [(F, score, args, kwargs) for F, (args, kwargs, score) in ordered] | |
def fetch_callable(cache, *arguments, **keywords): | |
'''Return the first callable that matches the constraints for the given `arguments` and `keywords`.''' | |
tree, table = cache | |
packed_parameters = arguments, keywords | |
candidates = cls.match(packed_parameters, tree, table) | |
iterable = cls.preordered(packed_parameters, candidates, tree, table) | |
packed = next(iterable) | |
F, _ = packed | |
return F | |
def fetch_prototype(documentation, callable): | |
'''Return the documentation prototype for the given `callable`.''' | |
prototype, _, _ = documentation[callable] | |
return prototype | |
# This is the entry-point closure that gets used to update the actual wrapper with a new candidate. | |
def result(wrapped): | |
'''Return a function that will call the given `wrapped` function if the parameters meet the required type constraints.''' | |
flattened_constraints = {argname : tuple(item for item in cls.flatten(types if isinstance(types, internal.types.unordered) else [types])) for argname, types in t_args.items()} | |
# First we need to extract the function from whatever type it is | |
# so that we can read any properties we need from it. We also extract | |
# its "constructor" so that we can re-create it after we've processed it. | |
try: | |
cons, func = cls.reconstructor(wrapped), cls.ex_function(wrapped) | |
if not callable(func): | |
raise internal.exceptions.InvalidTypeOrValueError | |
except internal.exceptions.InvalidTypeOrValueError: | |
logging.warning("{:s}(...): Refusing to create a case for a non-callable object ({!s}).".format('.'.join([__name__, cls.__name__]), wrapped)) | |
return wrapped | |
# Next we need to extract all of the argument information from it. We also need to | |
# determine whether it's a special type of some sort so that we know that its first | |
# argument is irrelevant for our needs. With regards to this, we can't do anything | |
# for methods since we see them before they get attached. However, we can explicitly | |
# check if it's using the magic name "__new__" which uses an implicit parameter. | |
args, defaults, (star, starstar) = cls.ex_args(func) | |
s_args = 1 if isinstance(wrapped, (internal.types.classmethod, internal.types.method)) or func.__name__ in {'__new__'} else 0 | |
# If the user decorated us whilst explicitly providing the previous | |
# function that this case is a part of, then make sure that we use it. | |
if len(other): | |
ok, prev = True, other[0] | |
# If we weren't given a function, then we need to be tricky and search | |
# through our parent frame's locals. Hopefully it's using the same name. | |
elif pycompat.function.name(func) in sys._getframe().f_back.f_locals: | |
ok, prev = True, sys._getframe().f_back.f_locals[pycompat.function.name(func)] | |
# Otherwise, we've hit first blood and this is the very first definition | |
# of the function. This requires us to do some construction later. | |
else: | |
ok = False | |
# So if we found an already-existing wrapper, then we need to steal its cache. | |
res = ok and prev and cls.ex_function(prev) | |
if ok and hasattr(res, cls.cache_name): | |
owner, cache, documentation = res, getattr(res, cls.cache_name), getattr(res, cls.documentation_name) | |
# Otherwise, we simply need to create a new cache entirely. | |
else: | |
owner, cache, documentation = func, ({}, {}), {} | |
res = cls.new_wrapper(func, cache, Fmissing=functools.partial(missing, documentation=documentation, ignored_parameter_count=s_args)) | |
res.__module__ = getattr(wrapped, '__module__', getattr(func, '__module__', '__main__')) | |
# Update our new wrapper with our cache (tree and table) and our | |
# documentation state so that it not only works but it reads too. | |
setattr(res, cls.cache_name, cache) | |
setattr(res, cls.documentation_name, documentation) | |
# Attach a few hardcoded utilities to the closure that we return. We add | |
# some empty namespaces as the first argument if it's a classmethod or __new__. | |
setattr(res, 'score', functools.partial(fetch_score, cache, *s_args * [object])) | |
setattr(res, 'fetch', functools.partial(fetch_callable, cache, *s_args * [object])) | |
setattr(res, 'describe', functools.partial(fetch_prototype, documentation)) | |
# Now we need to add the function we extracted to our tree of candidate functions. | |
constraints = {name : types for name, types in flattened_constraints.items()} | |
tree, table = cache | |
F = cls.add(func, constraints, tree, table) | |
assert(F is func) | |
# Verify that we constrained all of the available types. If any are left, then | |
# we need to complain about it since we just added the case to our tree. | |
if constraints: | |
co = pycompat.function.code(func) | |
description = '.'.join(getattr(func, attribute) for attribute in ['__module__', '__name__'] if hasattr(func, attribute)) | |
location = "{:s}:{:d}".format(pycompat.code.filename(co), pycompat.code.linenumber(co)) | |
error_constraints = {name : "{:s}={!s}".format(name, types.__name__ if isinstance(types, internal.types.type) or types in {internal.types.callable} else '|'.join(sorted(t_.__name__ for t_ in types)) if hasattr(types, '__iter__') else "{!r}".format(types)) for name, types in flattened_constraints.items()} | |
logging.warning(u"@{:s}(...) : Unable to constrain {:d} parameter{:s} ({:s}) for prototype \"{:s}({:s})\" at {:s}.".format('.'.join([__name__, cls.__name__]), len(constraints), '' if len(constraints) == 1 else 's', ', '.join(constraints), description, ', '.join(error_constraints.get(name, name) for name in args[s_args:]), location)) | |
# Now we can use the information we collected about the function to | |
# generate a documentation entry. Once we do this, then we completely | |
# regenerate the documentation so that it's always up to date. | |
documentation[func] = cls.render_documentation(func, flattened_constraints, args[:s_args]) | |
res.__doc__ = cls.document(documentation) | |
# ..and then we can restore the original wrapper in all of its former glory. | |
return cons(res) | |
# Validate the types of all of our arguments and raise an exception if it used | |
# an unsupported type. | |
for name, types in t_args.items(): | |
if not isinstance(types, (internal.types.type, internal.types.tuple)) and types not in {internal.types.callable}: | |
error_keywords = ("{:s}={!s}".format(name, types.__name__ if isinstance(types, internal.types.type) or types in {internal.types.callable} else '|'.join(t_.__name__ for t_ in types) if hasattr(types, '__iter__') else "{!r}".format(types)) for name, types in t_args.items()) | |
raise internal.exceptions.InvalidParameterError(u"@{:s}({:s}) : The value ({!s}) specified for parameter \"{:s}\" is not a supported type.".format('.'.join([__name__, cls.__name__]), ', '.join(error_keywords), types, string.escape(name, '"'))) | |
continue | |
# Validate the types of our arguments that we were asked to decorate with, this | |
# way we can ensure that our previously decorated functions are actually of the | |
# correct type. We do this strictly to assist with debugging. | |
try: | |
[cls.ex_function(item) for item in other] | |
except Exception: | |
error_keywords = ("{:s}={!s}".format(name, types.__name__ if isinstance(types, internal.types.type) or types in {internal.types.callable} else '|'.join(item.__name__ for item in types) if hasattr(types, '__iter__') else "{!r}".format(types)) for name, types in t_args.items()) | |
raise internal.exceptions.InvalidParameterError(u"@{:s}({:s}) : The specified callable{:s} {!r} {:s} not of a valid type.".format('.'.join([__name__, cls.__name__]), ', '.join(error_keywords), '' if len(other) == 1 else 's', other, 'is' if len(other) == 1 else 'are')) | |
# If we were given an unexpected number of arguments to decorate with, then | |
# raise an exception. This is strictly done to assist with debugging. | |
if len(other) > 1: | |
error_keywords = ("{:s}={!s}".format(name, types.__name__ if isinstance(types, internal.types.type) or types in {internal.types.callable} else '|'.join(item.__name__ for item in types) if hasattr(types, '__iter__') else "{!r}".format(types)) for name, type in t_args.items()) | |
raise internal.exceptions.InvalidParameterError(u"@{:s}({:s}) : More than one callable ({:s}) was specified to add a case to. Refusing to add cases to more than one callable.".format('.'.join([__name__, cls.__name__]), ', '.join(error_keywords), ', '.join("\"{:s}\"".format(string.escape(pycompat.code.name(c) if isinstance(c, internal.types.code) else c.__name__, '"')) for c in other))) | |
return result | |
@classmethod | |
def document(cls, descriptions): | |
'''Generate the documentation for a multicased function using the given `descriptions`.''' | |
result = [] | |
# Configure how we plan on joining each component of the documentation. | |
arrow, indent = ' -> ', 4 | |
maximum = max(len(prototype) for F, (prototype, _, _) in descriptions.items()) if descriptions else 0 | |
# Collect each prototype and lines that compose each description. | |
for F in cls.sorted_documentation(descriptions): | |
prototype, lines, _ = descriptions[F] | |
pointed = "{: <{:d}s}{:s}".format(prototype, maximum, arrow) | |
iterable = (item for item in lines) | |
result.append(''.join([pointed, next(iterable)]) if len(lines) else prototype) | |
result.extend("{: >{padding:d}s}".format(item, padding=indent + len(pointed) + len(item)) for item in iterable) | |
return '\n'.join(result) | |
@classmethod | |
def filter_candidates(cls, candidates, packed_parameters, tree, table): | |
'''Critique the arguments in `packed_parameters` against the branch that is given in `candidates`.''' | |
args, _ = packed_parameters | |
parameters = (arg for arg in args) | |
# First we need a closure that's driven simply by the parameter value. This gets | |
# priority and will just descend through the tree until nothing is left. | |
def critique_unnamed(parameter, branch): | |
results = [] | |
for F, node in branch: | |
parameter_name, index, _ = node | |
discard_and_required, critique_and_transform, _ = table[F, index] | |
parameter_critique, parameter_transform, _ = critique_and_transform | |
# Now we take the parameter value, transform it, and then critique it. | |
try: value = parameter_transform(parameter) if parameter_transform else parameter | |
except Exception: continue | |
if parameter_critique(value) if parameter_critique else True: | |
results.append((F, node)) | |
continue | |
return results | |
# This function is only needed for processing arguments, because if there aren't any | |
# then all the nodes at a 0-height in our tree need to be checked for termination. | |
assert(args) | |
# Iterate through all of our parameters until we're finished or out of parameters. | |
iterable = ((F, tree[index][F]) for F, index in candidates) | |
branch = [(F, (name, index, next)) for F, (name, index, next) in iterable if index >= 0] | |
for index, parameter in enumerate(parameters): | |
candidates = critique_unnamed(parameter, branch) | |
branch = [(F, tree[next][F]) for F, (_, _, next) in candidates if next >= 0 and (F, next) in table] | |
return [(F, index) for F, (_, index, next) in candidates if index == next or next >= 0] | |
@classmethod | |
def filter_args(cls, packed_parameters, tree, table): | |
'''Critique the arguments from `packed_parameters` using the provided `tree` and `table`.''' | |
args, _ = packed_parameters | |
# If there are some parameters, then start out by only considering functions | |
# which support the number of args we were given as candidates. | |
if args: | |
count = len(args) | |
results = {F for F in tree.get(count - 1, [])} | |
# Next, we go through all of the available functions to grab only the ones | |
# which can take varargs and are smaller than our arg count. | |
iterable = ((F, pycompat.function.code(F)) for F in tree.get(0, [])) | |
unknowns = {F for F, c in iterable if pycompat.code.flags(c) & cls.CO_VARARGS and pycompat.code.argcount(c) <= count} | |
# Now we turn them into a branch and then we can process the arguments. | |
candidates = [(F, 0) for F in results | unknowns] | |
return cls.filter_candidates(candidates, packed_parameters, tree, table) | |
# If there are no parameters, then return everything. This really should only be done | |
# by multicase.match, but we're doing this here for the sake of the unit-tests. | |
branch = tree[0].items() | |
return [(F, index) for F, (_, index, next) in branch if index == next or index >= 0] | |
@classmethod | |
def filter_keywords(cls, candidates, packed_parameters, tree, table): | |
'''Critique the keywords from the `packed_parameters` against the branch given in `candidates`.''' | |
args, kwds = packed_parameters | |
def critique_names(kwds, branch): | |
results, keys = [], {name for name in kwds} | |
for F, node in branch: | |
parameter_name, index, _ = node | |
discard_and_required, critique_and_transform, wildargs = table[F, index] | |
# Extract our sets that are required for the callable to be | |
# considered a candidate and filter our keywords depending | |
# on whether we're processing named parameters or wild ones. | |
discard, required = discard_and_required | |
available = keys - discard if wildargs else keys & required | |
# If we still have any parameters available and their names | |
# don't matter (wild), then critique and what we just consumed. | |
if available and wildargs: | |
parameter_critique, parameter_transform, _ = critique_and_transform | |
parameters = (kwds[name] for name in available) | |
transformed = map(parameter_transform, parameters) if parameter_transform else parameters | |
critique = parameter_critique if parameter_critique else lambda parameter: True | |
results.append((F, node)) if all(critique(parameter) for parameter in transformed) else None | |
# If our parameter name is available, then we can critique it. | |
elif available and parameter_name in available: | |
parameter_critique, parameter_transform, _ = critique_and_transform | |
try: parameter = parameter_transform(kwds[parameter_name]) if parameter_transform else kwds[parameter_name] | |
except Exception: continue | |
critique = parameter_critique if parameter_critique else lambda parameter: True | |
results.append((F, node)) if parameter_critique(parameter) else None | |
# Otherwise this parameter doesn't exist which makes it not a candidate. | |
else: | |
continue | |
continue | |
return results | |
# Process as many keywords as we have left... | |
branch = [(F, tree[index][F]) for F, index in candidates if (F, index) in table] | |
for count in range(len(kwds)): | |
critiqued = critique_names(kwds, branch) | |
# If processing this keyword resulted in a loop (varargs), then | |
# promote it to a wildargs so we can check the rest of the kwargs. | |
candidates = [(F, -1 if index == next else index) for F, (_, index, next) in critiqued] | |
branch = [(F, tree[-1 if index == next else next][F]) for F, (_, index, next) in critiqued if (F, -1 if index == next else next) in table] | |
return candidates | |
@classmethod | |
def critique_and_transform(cls, F, packed_parameters, tree, table): | |
'''Critique and transform the `packed_parameters` for the function `F` returning the resolved parameters and a bias for the number of parameters we needed to transform.''' | |
args, kwds = packed_parameters | |
results, keywords = [], {name : value for name, value in kwds.items()} | |
counter = 0 | |
# First process each of the arguments and add them to our results. | |
_, index, _ = tree[0][F] | |
for arg in args: | |
_, critique_and_transform, _ = table[F, index] | |
parameter_critique, parameter_transform, parameter_constraints = critique_and_transform | |
parameter = parameter_transform(arg) if parameter_transform else arg | |
assert(parameter_critique(parameter) if parameter_critique else True) | |
#counter = counter if arg is parameter else counter + 1 | |
#counter = counter if arg == parameter else counter + 1 | |
#counter = counter if id(arg) == id(parameter) and parameter_critique else counter + 1 if parameter_critique else counter + 2 | |
if parameter_critique: | |
counter = counter + 2 if id(arg) != id(parameter) else counter if arg.__class__ in parameter_constraints else counter + 1 | |
else: | |
counter = counter + 3 | |
results.append(parameter) | |
_, _, index = tree[index][F] | |
# First check if we have any other parameters we need to process | |
# because if we don't, then we can just quit while we're ahead. | |
if not keywords and (F, index) not in table: | |
return results, keywords, counter | |
assert((F, index) in table) | |
# Now since we have keywords left to process, we need to ensure | |
# that we still have parameters to complete or we need to promote | |
# ourselves to -1 so that we can critique the keywords leftover. | |
_, index, next = tree[index][F] | |
index = -1 if index == next else index if (F, next) in table else index | |
# Now we can process the rest of our function arguments using whatever | |
# keywords that are available until our index becomes less than 0. | |
while index >= 0 and (F, index) in table: | |
name, index, next = tree[index][F] | |
_, critique_and_transform, wild = table[F, index] | |
parameter_critique, parameter_transform, parameter_constraints = critique_and_transform | |
if index >= 0 and name and not wild: | |
arg = keywords.pop(name) | |
parameter = parameter_transform(arg) if parameter_transform else arg | |
assert(parameter_critique(parameter) if parameter_critique else True) | |
#counter = counter if arg is parameter else counter + 1 | |
#counter = counter if arg == parameter else counter + 1 | |
#counter = counter if id(arg) == id(parameter) and parameter_critique else counter + 1 if parameter_critique else counter + 2 | |
if parameter_critique: | |
counter = counter + 2 if id(arg) != id(parameter) else counter if arg.__class__ in parameter_constraints else counter + 1 | |
else: | |
counter = counter + 3 | |
results.append(parameter) | |
index = next | |
# Despite this assertion not being exhaustive, if we ended up with some | |
# keywords leftover then our function and index should be in the table. | |
assert((F, index) in table if keywords else True) | |
# That should be all of our named arguments, so whatever is left should | |
# be the keyword arguments that belong to the wildargs candidates. | |
return results, keywords, counter | |
@classmethod | |
def ordered(cls, candidates, tree, table): | |
'''Yield the given `candidates` in the correct order using `tree` and `table`.''' | |
iterable = reversed(sorted(candidates, key=operator.itemgetter(1))) | |
items = [ item for item in iterable ] | |
# First we yield all of the items that have successfully terminated. | |
iterable = ( item for item in items if item not in table ) | |
for F, index in iterable: | |
yield F, index | |
# Now we can yield the callable that we resolved the most parameters with. | |
count, items = 0, [item for item in items if item in table] | |
for F, index in items[count:]: | |
yield F, index | |
count += 1 | |
# Afterwards, we just yield the rest which should all be wildarg parameters. | |
for F, index in items[count:]: | |
yield F, index | |
return | |
@classmethod | |
def preordered(cls, packed_parameters, candidates, tree, table): | |
'''Yield the callable and transformed `packed_parameters` from `candidates` using `tree` and `table` in the correct order.''' | |
results = {} | |
[ results.setdefault(index, []).append(F) for F, index in candidates ] | |
order = [index for index in reversed(sorted(results))] | |
for index in order: | |
items = [ F for F in results[index] if (F, index) not in table ] | |
# If we have more than one match, then we need to pre-sort this | |
# by whatever their bias is so that we choose the right one. | |
if len(items) > 1: | |
biased, iterable = {}, (cls.critique_and_transform(F, packed_parameters, tree, table) for F in items) | |
#biased = {bias : (F, (args, kwds)) for F, (args, kwds, bias) in zip(items, iterable) } | |
[ biased.setdefault(bias, []).append((F, (args, kwds))) for F, (args, kwds, bias) in zip(items, iterable) ] | |
ordered = itertools.chain(*(biased[key] for key in sorted(biased))) | |
# Otherwise, we don't need to sort and can take the first one. | |
else: | |
iterable = (cls.critique_and_transform(F, packed_parameters, tree, table) for F in items) | |
ordered = ((F, (args, kwds)) for F, (args, kwds, _) in zip(items, iterable)) | |
# Now we have the biased order and the parameters to use, so yield | |
# them to the caller so that it can actually be executed. | |
for F, packed in ordered: | |
yield F, packed | |
# Now we iterate through the results that actually do exist | |
# because they're either variable-length parameters or wild. | |
items = [ F for F in results[index] if (F, index) in table ] | |
# Similarly, if we have more than one match here, then we need | |
# to critique_and_transform the parameters and sort by bias. | |
if len(items) > 1: | |
biased, iterable = {}, (cls.critique_and_transform(F, packed_parameters, tree, table) for F in items) | |
#biased = {bias : (F, (args, kargs)) for F, (args, kargs, bias) in zip(items, iterable) } | |
[ biased.setdefault(bias, []).append((F, (args, kwds))) for F, (args, kwds, bias) in zip(items, iterable) ] | |
ordered = itertools.chain(*(biased[key] for key in sorted(biased))) | |
# There's only one match, so that's exactly what we'll return. | |
else: | |
iterable = (cls.critique_and_transform(F, packed_parameters, tree, table) for F in items) | |
ordered = ((F, (args, kwds)) for F, (args, kwds, _) in zip(items, iterable)) | |
# Yield what we found and continue to the next match. | |
for F, packed in ordered: | |
yield F, packed | |
continue | |
return | |
# For the following methods, we could expose another decorator that allows | |
# one to specifically update the critique_and_transform field inside the | |
# parameter table, but for simpliciy (and backwards compatibility) we only | |
# use a single decorator and explicitly transform the type with these. | |
@classmethod | |
def parameter_critique(cls, *types): | |
'''Return a callable that critiques its parameter for any of the given `types`.''' | |
unsorted_types = {item for item in types} | |
# If there are no types, then we can bail here because critique should always succeed. | |
if not unsorted_types: | |
return None | |
# Filter our types for things that are not actual types. This is okay since we | |
# should be using unsorted_types to check whether to include our conditions and | |
# we need this so that we can use the types we were given with isinstance(). | |
filtered = tuple(item for item in unsorted_types if isinstance(item, type)) | |
if 1 < operator.sub(*reversed(sorted(map(len, [unsorted_types, filtered])))): | |
invalid = unsorted_types - {item for item in itertools.chain(filtered, [callable])} | |
parameters = [item.__name__ if isinstance(item, type) else item.__name__ if item in {callable} else "{!r}".format(item) for item in types] | |
raise internal.exceptions.InvalidParameterError(u"{:s}.parameter_critique({:s}) : Refusing to critique a parameter using {:s} other than a type ({:s}).".format('.'.join([__name__, cls.__name__]), ', '.join(parameters), 'an object' if len(invalid) == 1 else 'objects', ', '.join(map("{!r}".format, invalid)))) | |
types = filtered | |
# Add our default condition that ensures that the parameter is a concrete type. | |
Finstance = lambda item, types=types: isinstance(item, types) | |
conditions = [Finstance] | |
# Add other conditions in case the type can be transformed to the correct one. | |
if {item for item in internal.types.integer} & unsorted_types: | |
conditions.append(lambda item: hasattr(item, '__int__')) | |
# XXX: there's literally no reason to detect parameters that can be coerced to a string. | |
#if {item for item in internal.types.string} & unsorted_types: | |
# conditions.append(lambda item: hasattr(item, '__str__')) | |
if callable in unsorted_types: | |
conditions.append(callable) | |
# Now we need to determine whether we combine our tests or only need the first one. | |
Fany = lambda item, conditions=conditions: any(F(item) for F in conditions) | |
return Fany if len(conditions) > 1 else Finstance | |
@classmethod | |
def parameter_transform(cls, *types): | |
'''Return a callable that transforms its parameter to any of the given `types`.''' | |
unsorted_types = {item for item in types} | |
# If there are no types, then we can bail here because there's no transform required. | |
if not unsorted_types: | |
return None | |
# Filter our types so that we can use them with isinstance() as we'll be | |
# using the unsorted_types set to check for type membership. | |
filtered = tuple(item for item in unsorted_types if isinstance(item, type)) | |
if 1 < operator.sub(*reversed(sorted(map(len, [unsorted_types, filtered])))): | |
invalid = unsorted_types - {item for item in itertools.chain(filtered, [callable])} | |
parameters = [item.__name__ if isinstance(item, type) else item.__name__ if item in {callable} else "{!r}".format(item) for item in types] | |
raise internal.exceptions.InvalidParameterError(u"{:s}.parameter_transform({:s}) : Refusing to transform a parameter using {:s} other than a type ({:s}).".format('.'.join([__name__, cls.__name__]), ', '.join(parameters), 'an object' if len(invalid) == 1 else 'objects', ', '.join(map("{!r}".format, invalid)))) | |
types = filtered | |
# Create a list that includes the condition for a transformation and the | |
# transformation itself. If it's not one of these, then we leave it alone. | |
transformers = [] | |
# Figure out the conditions that require us to avoid transforming the item. | |
Fidentity = lambda item: item | |
if callable in unsorted_types and len(unsorted_types) == 1: | |
transformers.append((Fidentity, callable)) | |
# If there's no callables in our unsorted_types, then we can just use isinstance. | |
elif callable not in unsorted_types: | |
transformers.append((Fidentity, lambda item, types=types: isinstance(item, types))) | |
# Otherwise there's some types and a callable that we need to split into two transformations. | |
else: | |
transformers.append((Fidentity, lambda item, types=types: isinstance(item, types))) | |
transformers.append((Fidentity, callable)) | |
# Figure out whether we need to add additional transformations...just in case. | |
if {item for item in internal.types.integer} & unsorted_types: | |
transformers.append((int, lambda item: hasattr(item, '__int__'))) | |
# XXX: i'm pretty much just showing off here, as there's literally no reason to attempt string coercion. | |
#if {item for item in internal.types.string} & unsorted_types: | |
# transformers.append((unicode if str == bytes else str, lambda item: hasattr(item, '__str__'))) | |
# Now we figure out if we need to do any actual transformations by checking whether the | |
# number of transformers we collected is larger than 1. This is because the first transformer | |
# will always be the identity function when the item type is correct. | |
Ftransform = lambda item, transformers=transformers: next((F(item) for F, condition in transformers if condition(item)), item) | |
return Ftransform if len(transformers) > 1 else Fidentity | |
# XXX: Our main decorator that is responsible for updating the decorated function. | |
@classmethod | |
def add(cls, callable, constraints, tree, table): | |
'''Add the `callable` with the specified type `constraints` to both the `tree` and `table`.''' | |
args, kwargs, packed = cls.ex_args(callable) | |
varargs, wildargs = packed | |
Fflattened_constraints = lambda types: {item for item in cls.flatten(types if isinstance(types, internal.types.unordered) else [types])} | |
# Extract the parameter names and the types that the callable was | |
# decorated with so that we can generate the functions used to | |
# transform and critique the value that determines its validity. | |
critique_and_transform = [] | |
for name in args: | |
t = constraints.pop(name, ()) | |
Fcritique, Ftransform = cls.parameter_critique(*t), cls.parameter_transform(*t) | |
critique_and_transform.append((Fcritique, Ftransform, Fflattened_constraints(t))) | |
# Generate two sets that are used to determine what parameter names | |
# are required for this wrapped function to still be considered. | |
discard_and_required = [] | |
for index, name in enumerate(args): | |
discard = {item for item in args[:index]} | |
required = {item for item in args[index:]} | |
discard_and_required.append((discard, required)) | |
# Zip up our parameters with both our critique_and_transform and | |
# discard_and_required lists so we can build a tree for each parameter. | |
items = [packed for packed in zip(enumerate(args), discard_and_required, critique_and_transform)] | |
for index_name, discard_and_required, critique_and_transform in items: | |
index, name = index_name | |
assert(index_name not in table) | |
table[callable, index] = discard_and_required, critique_and_transform, wildargs if wildargs == name else '' | |
tree.setdefault(index, {})[callable] = name, index, 1 + index | |
# We should be done, but in case there's var args or wild args (keywords), then | |
# we'll need to create some cycles within our tree and table. None of these entries | |
# hold anything of value, but they need to hold something.. So we create some defaults. | |
discard_and_required = {name for name in args}, {name for name in []} | |
critique_and_explode = operator.truth, lambda item: False, () | |
critique_and_continue = operator.truth, lambda item: True, () | |
# If there are no parameters whatsoever, then we need a special case | |
# which gets used at the first pass of our parameter checks. Essentially | |
# we treat this as a vararg, but without the loop. This way if it does | |
# turn out to be a vararg or wildarg, it will get fixed to add the loop. | |
if not args: | |
discard_and_impossible = {name for name in args}, {None} | |
# If we don't have any parameters, then our first parameter should | |
# immediately fail. If we're variable-length'd or wild, then the | |
# conditionals that follow this will overwrite this with a loop. | |
table[callable, len(args)] = discard_and_impossible, critique_and_explode, '' | |
tree.setdefault(len(args), {})[callable] = '', 0, -1 | |
# If both are selected, then we need to do some connections here. | |
if varargs and wildargs: | |
t = constraints.pop(wildargs, ()) | |
Fcritique, Ftransform = cls.parameter_critique(*t), cls.parameter_transform(*t) | |
critique_and_transform = Fcritique, Ftransform, Fflattened_constraints(t) | |
# Since this callable is variable-length'd, we create a loop in our tree | |
# and table so that we can consume any number of parameters. | |
table[callable, len(args)] = discard_and_required, critique_and_transform, wildargs | |
tree.setdefault(len(args), {})[callable] = varargs, len(args), len(args) | |
# Since the callable is also wild, we need to create a loop outside the | |
# count of parameters (-1). This way we can promote a loop to this path. | |
tree.setdefault(-1, {})[callable] = wildargs, -1, -1 | |
table[callable, -1] = discard_and_required, critique_and_transform, wildargs | |
# If there's variable-length parameters, then we simply need to create a loop. | |
elif varargs: | |
t = constraints.pop(varargs, ()) | |
Fcritique, Ftransform = cls.parameter_critique(*t), cls.parameter_transform(*t) | |
critique_and_transform = Fcritique, Ftransform, Fflattened_constraints(t) | |
# We can't really match against the parameter name with variable-length | |
# parameters, so we add it as a loop for an empty string (unnamed). | |
tree.setdefault(len(args), {})[callable] = varargs, len(args), len(args) | |
table[callable, len(args)] = discard_and_required, critique_and_transform, '' | |
# Pop out the wild (keyword) parameter type from our decorator parameters. | |
elif wildargs: | |
t = constraints.pop(wildargs, ()) | |
Fcritique, Ftransform = cls.parameter_critique(*t), cls.parameter_transform(*t) | |
critique_and_transform = Fcritique, Ftransform, Fflattened_constraints(t) | |
# We need to go through our type parameters and update our table | |
# so that it includes any wild keyword parameters. | |
tree.setdefault(-1, {})[callable] = wildargs, -1, -1 | |
table[callable, -1] = discard_and_required, critique_and_transform, wildargs | |
# Create our sentinel entry in the tree so that when we run out | |
# of args, we transfer to the keyword parameters (-1) to continue. | |
tree.setdefault(len(args), {})[callable] = '', len(args), -1 | |
table[callable, len(args)] = discard_and_required, critique_and_continue, wildargs | |
return callable | |
@classmethod | |
def flatten(cls, iterable): | |
'''Take the provided `iterable` (or tree) and then yield each individual element resulting in it being "flattened".''' | |
duplicates = {item for item in []} | |
for item in iterable: | |
if isinstance(item, internal.types.unordered): | |
for item in cls.flatten(item): | |
if item in duplicates: | |
continue | |
yield item | |
duplicates |= {item} | |
continue | |
if item in duplicates: | |
continue | |
yield item | |
duplicates |= {item} | |
return | |
@classmethod | |
def render_documentation(cls, function, constraints, ignored): | |
'''Render the documentation for a `function` using the given `constraints` while skipping over any `ignored` parameters.''' | |
args, defaults, (star, starstar) = cls.ex_args(function) | |
parameters = [name for name in itertools.chain(args, [star, starstar]) if name] | |
prototype = cls.prototype(function, constraints, ignored) | |
documentation = function.__doc__ or '' | |
lines = documentation.split('\n') | |
constraint_order = [constraints.get(arg, ()) for arg in parameters] | |
return prototype, [item.strip() for item in lines] if documentation.strip() else [], (lambda *args: args)(*constraint_order) | |
# Create a dictionary to bias the order of our documentation so that | |
# custom or more complicated types tend to come first. | |
documentation_bias = {constraint : 1 for constraint in itertools.chain(internal.types.ordered, internal.types.unordered)} | |
documentation_bias.update({constraint : 2 for constraint in itertools.chain(internal.types.integer, internal.types.string)}) | |
@classmethod | |
def sorted_documentation(cls, descriptions): | |
'''Return the provided `descriptions` in the order that was used to generate their documentation.''' | |
# First we need to look at the documentation and extract the number of lines. | |
iterable = ((F, pycompat.function.documentation(F) or '') for F in descriptions) | |
stripped = ((F, string.strip()) for F, string in iterable) | |
newlines = {F : string.count('\n') for F, string in stripped} | |
# Now we extract the constraints for each parameter so that we can calculate the | |
# number of parameters along with a bias based on the constraints. | |
items = [(F, constraints) for F, (_, _, constraints) in descriptions.items()] | |
counts = {F : len(constraints) for F, constraints in items} | |
bias = {F : sum(max((cls.documentation_bias.get(item, 0) for item in items) if items else [0]) for items in constraints) for F, constraints in items} | |
# Afterwards we can sort by number of lines, number of parameters, and then constraint bias. | |
items = [((newlines[F], counts[F], bias[F]), F) for F in descriptions] | |
return [F for _, F in sorted(items, key=operator.itemgetter(0))] | |
@classmethod | |
def prototype(cls, function, constraints={}, ignored={item for item in []}): | |
'''Generate a prototype for the given `function` and `constraints` while skipping over the `ignored` argument names.''' | |
args, defaults, (star, starstar) = cls.ex_args(function) | |
def Femit_arguments(names, constraints, ignored): | |
'''Yield a tuple for each individual parameter composed of the name and its constraints.''' | |
# Iterate through all of our argument names. If any of them are within | |
# our ignored items, however, then we can simply skip over them. | |
for item in names: | |
if item in ignored: | |
continue | |
# If the current argument name is not within our constraints, then | |
# we only have to yield the argument name and move on. | |
if item not in constraints: | |
yield item, None | |
continue | |
# Figure out which constraint to use for each item, and yield how | |
# it should be represented back to the caller. | |
constraint = constraints[item] | |
if isinstance(constraint, internal.types.type) or constraint in {internal.types.callable}: | |
yield item, constraint.__name__ | |
elif hasattr(constraint, '__iter__'): | |
yield item, '|'.join(type.__name__ for type in cls.flatten(constraint)) | |
else: | |
yield item, "{!s}".format(constraint) | |
continue | |
return | |
# Log any multicased functions that accidentally define type constraints for parameters | |
# which don't actually exist. This is specifically done in order to aid debugging. | |
unavailable = {constraint_name for constraint_name in constraints.keys()} - {argument_name for argument_name in args} | |
if unavailable: | |
co = pycompat.function.code(function) | |
co_fullname, co_filename, co_lineno = '.'.join([function.__module__, function.__name__]), os.path.relpath(co.co_filename, idaapi.get_user_idadir()), co.co_firstlineno | |
proto_s = "{:s}({:s}{:s}{:s})".format(co_fullname, ', '.join(args) if args else '', ", *{:s}".format(star) if star and args else "*{:s}".format(star) if star else '', ", **{:s}".format(starstar) if starstar and (star or args) else "**{:s}".format(starstar) if starstar else '') | |
path_s = "{:s}:{:d}".format(co_filename, co_lineno) | |
logging.warning("{:s}({:s}): Unable to constrain the type in {:s} for parameter{:s} ({:s}) at {:s}.".format('.'.join([__name__, cls.__name__]), co_fullname, proto_s, '' if len(unavailable) == 1 else 's', ', '.join(unavailable), path_s)) | |
# Return the prototype for the current function with the provided parameter constraints. | |
iterable = (item if parameter is None else "{:s}={:s}".format(item, parameter) for item, parameter in Femit_arguments(args, constraints, ignored)) | |
items = iterable, ["*{:s}".format(star)] if star else [], ["**{:s}".format(starstar)] if starstar else [] | |
return "{:s}({:s})".format(pycompat.function.name(function), ', '.join(itertools.chain(*items))) | |
@classmethod | |
def match(cls, packed_parameters, tree, table): | |
'''Use the provided `packed_parameters` to find a matching callable in both `tree` and `table`.''' | |
args, kwargs = packed_parameters | |
candidates = cls.filter_args(packed_parameters, tree, table) | |
# If there's no other filters, then we filter our candidates | |
# by culling out everything that can still take parameters. | |
if not kwargs: | |
assert(all(index >= 0 for F, index in candidates)) | |
iterable = ((F, tree[index][F]) for F, index in candidates) | |
branch = [(F, (name, index, next)) for F, (name, index, next) in iterable if next <= len(args)] | |
# Now we have a branch containing everything that we care about. So, all we really | |
# need to do here is collect our results if we have any that are available. | |
results = [(F, next) for F, (name, index, next) in branch if (F, next) not in table] | |
# In case we didn't get any results, then we move onto the next branch so that we | |
# can grab the next varargs or wildargs candidates that are still available. | |
nextbranch = [(F, tree[next][F]) for F, (name, index, next) in branch if (F, next) in table] | |
vars = [(F, next) for F, (name, index, next) in nextbranch if index == next and index <= len(args)] | |
wild = [(F, next) for F, (name, index, next) in nextbranch if index != next and next < 0] | |
# That was everything, so we just need to return them in the correct order. | |
return results + vars + wild | |
# If we had some args that we ended up processing, then we need to shift them if there's | |
# still some parameters or promote them to keywords to do the final filtering pass. | |
elif args: | |
iterable = [(F, tree[index][F]) for F, index in candidates if (F, index) in table] | |
candidates = [(F, -1 if index == next else next) for F, (_, index, next) in iterable] | |
#candidates = [(F, -1 if index == next else index) for F, (_, index, next) in iterable] | |
# Now each candidate should be at the right place in their tree and we can filter keywords. | |
results = cls.filter_keywords(candidates, packed_parameters, tree, table) | |
# Last thing to do is to take our results, filter their matches, sort them by their | |
# and then we can return them to the caller to actually use them. | |
iterable = ((F, tree[index][F]) for F, index in results) | |
return [(F, next) for F, (_, index, next) in iterable if next < 0 or (F, next) not in table] | |
@classmethod | |
def new_wrapper(cls, func, cache, Fmissing=None, Fdebug_candidates=None): | |
'''Create a new wrapper that will determine the correct function to call.''' | |
tree, table = cache | |
## Define the wrapper for the function that we're decorating. This way whenever the | |
## decorated function gets called, we can search for one that matches the correct | |
## constraints and dispatch into it with the correctly transformed parameters. | |
def F(*arguments, **keywords): | |
packed_parameters = arguments, keywords | |
candidates = cls.match(packed_parameters, tree, table) | |
iterable = cls.preordered(packed_parameters, candidates, tree, table) | |
# Extract our first match if we were able to find one. If not, then pass what | |
# we tried to match to the missing-hook to complain about it. | |
result = next(iterable, None) | |
if result is None: | |
if Fmissing is not None: | |
return Fmissing(packed_parameters, tree, table) | |
raise RuntimeError(packed_parameters, tree, table) | |
# If our debug-hook is defined, then pass our matches to it so that it can be | |
# dumped to the screen or stashed somewhere to assist debugging. | |
if Fdebug_candidates is not None: | |
res = Fdebug_candidates(itertools.chain([result], iterable), packed_parameters, tree, table) | |
assert(res is None) | |
# We got a callable, so we just need to call it with our parameters. | |
F, (args, kwds) = result | |
return F(*args, **kwds) | |
# First, we need to swap out the original code object with the one from the closure | |
# that we defined. In order to preserve information within the backtrace, we just | |
# make a copy of all of the relevant code properties. | |
f, c = F, pycompat.function.code(F) | |
cargs = c.co_argcount, c.co_nlocals, c.co_stacksize, c.co_flags, \ | |
c.co_code, c.co_consts, c.co_names, c.co_varnames, \ | |
c.co_filename, '.'.join([func.__module__, pycompat.function.name(func)]), \ | |
c.co_firstlineno, c.co_lnotab, c.co_freevars, c.co_cellvars | |
newcode = pycompat.code.new(cargs, pycompat.code.unpack_extra(c)) | |
# Now we can use the new code object that we created in order to create a function, | |
# assign the previous name and documentation into it, and return it. | |
result = pycompat.function.new(newcode, pycompat.function.globals(f), pycompat.function.name(f), pycompat.function.defaults(f), pycompat.function.closure(f)) | |
pycompat.function.set_name(result, pycompat.function.name(func)), | |
pycompat.function.set_documentation(result, pycompat.function.documentation(func)) | |
return result | |
@classmethod | |
def ex_function(cls, object): | |
'''Extract the actual function type from a callable.''' | |
if isinstance(object, internal.types.function): | |
return object | |
elif isinstance(object, internal.types.method): | |
return pycompat.method.function(object) | |
elif isinstance(object, internal.types.code): | |
res, = (item for item in gc.get_referrers(c) if pycompat.function.name(item) == pycompat.code.name(c) and isinstance(item, internal.types.function)) | |
return res | |
elif isinstance(object, internal.types.descriptor): | |
return object.__func__ | |
raise internal.exceptions.InvalidTypeOrValueError(object) | |
@classmethod | |
def reconstructor(cls, item): | |
'''Return a closure that returns the original callable type for a function.''' | |
if isinstance(item, internal.types.function): | |
return lambda f: f | |
if isinstance(item, internal.types.method): | |
return lambda f: pycompat.method.new(f, pycompat.method.self(item), pycompat.method.type(item)) | |
if isinstance(item, internal.types.descriptor): | |
return lambda f: type(item)(f) | |
if isinstance(item, internal.types.instance): | |
return lambda f: internal.types.InstanceType(type(item), {key : value for key, value in f.__dict__.items()}) | |
if isinstance(item, internal.types.class_t): | |
return lambda f: type(item)(item.__name__, item.__bases__, {key : value for key, value in f.__dict__.items()}) | |
raise internal.exceptions.InvalidTypeOrValueError(type(item)) | |
@classmethod | |
def ex_args(cls, f): | |
'''Extract the arguments from a function.''' | |
c = pycompat.function.code(f) | |
varnames_count, varnames_iter = pycompat.code.argcount(c), (item for item in pycompat.code.varnames(c)) | |
args = tuple(itertools.islice(varnames_iter, varnames_count)) | |
res = { a : v for v, a in zip(reversed(pycompat.function.defaults(f) or []), reversed(args)) } | |
try: starargs = next(varnames_iter) if pycompat.code.flags(c) & cls.CO_VARARGS else "" | |
except StopIteration: starargs = "" | |
try: kwdargs = next(varnames_iter) if pycompat.code.flags(c) & cls.CO_VARKEYWORDS else "" | |
except StopIteration: kwdargs = "" | |
return args, res, (starargs, kwdargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
there's probably a potential bug here with functions that use keywords, specifically when transitioning from the parameters to the keywords. this is marked by
critique_and_continue
and might result in another parameter being consumed.