Skip to content

Instantly share code, notes, and snippets.

@jeetsukumaran
Last active May 19, 2017 00:12
Show Gist options
  • Select an option

  • Save jeetsukumaran/65fa8468a52d78ffdf28a459c362323f to your computer and use it in GitHub Desktop.

Select an option

Save jeetsukumaran/65fa8468a52d78ffdf28a459c362323f to your computer and use it in GitHub Desktop.
Implicit Interface Discovery
#! /usr/bin/env python
import inspect
import sys
import collections
import re
class CallInfo(object):
def __init__(self,
access_type,
calling_module_name,
calling_class_name,
calling_function_name,
):
self.access_type = access_type
self.calling_module_name = calling_module_name
self.calling_class_name = calling_class_name
self.calling_function_name = calling_function_name
if not self.calling_class_name:
self.caller_fqname = ".".join([calling_module_name, calling_function_name])
else:
self.caller_fqname = ".".join([calling_module_name, calling_class_name, calling_function_name])
self.hash_str = "+++".join([
self.access_type,
self.calling_module_name,
self.calling_class_name,
self.calling_function_name,
])
self.hash_int = hash(self.hash_str)
def __eq__(self, o):
return o.caller_fqname == self.caller_fqname and o.access_type == self.access_type
def __hash__(self):
return self.hash_int
class ImplicitInterfaceInference(object):
"""
Infer the implicit interface exposed to client code by logging the methods
and attributes called on classes.
"""
@staticmethod
def _log_access(obj,
name,
call_registry,
calling_module_patterns_to_exclude,
access_type,
):
"""
Main interface logger.
"""
##########################################################################
## Reference to method/property/attribute being called
# accessed_target = object.__getattribute__(obj, name)
accessed_target = object.__getattribute__(obj, name)
##########################################################################
## Avoid recursion
if name in ("__class__", "__name__"):
return accessed_target
current_class_name = obj.__class__.__name__
##########################################################################
## We just want the public interface, so avoid logging methods/attributes
## that have been called internally
(calling_frame, calling_filename, calling_line_number, calling_function_name, calling_lines, calling_index) = inspect.getouterframes(inspect.currentframe())[2]
if calling_function_name in (
# "__init__",
"__setattr__",
"__getattribute__",
"_log_access",
"_log_setattr",
"_log_getattribute"):
return accessed_target
calling_module_name = inspect.getmodule(calling_frame).__name__
for p in calling_module_patterns_to_exclude:
if p.match(calling_module_name):
return accessed_target
# if calling_filename in calling_module_patterns_to_exclude:
# return accessed_target
try:
calling_class_name = calling_frame.f_locals['self'].__class__.__name__
except KeyError:
calling_class_name = ""
if current_class_name == calling_class_name:
return accessed_target
##########################################################################
## Log the method/property/attribute being called
if inspect.isfunction(accessed_target) or inspect.ismethod(accessed_target):
callee_type = "[METHODS]"
else:
callee_type = "[ATTRIBUTES]"
call_info = CallInfo(
access_type=access_type,
calling_module_name=calling_module_name,
calling_class_name=calling_class_name,
calling_function_name=calling_function_name,
)
try:
call_registry[current_class_name][callee_type][name][call_info.caller_fqname].add(call_info)
except KeyError:
call_registry[current_class_name][callee_type][name][call_info.caller_fqname] = set([call_info])
return accessed_target
@staticmethod
def def_getattribute(call_registry, calling_module_patterns_to_exclude):
"""
This function will replace/override the default/defined
``__getattribute__`` of objects whose public interface we want to
discover.
"""
def _log_getattribute(self, name):
return ImplicitInterfaceInference._log_access(self,
name=name,
call_registry=call_registry,
calling_module_patterns_to_exclude=calling_module_patterns_to_exclude,
access_type="r",
)
return _log_getattribute
@staticmethod
def def_setattr(call_registry, calling_module_patterns_to_exclude):
"""
This function will replace/override the default/defined
``__setattr__`` of objects whose public interface we want to
discover.
"""
def _log_setattr(self, name, value):
object.__setattr__(self, name, value)
ImplicitInterfaceInference._log_access(self,
name=name,
call_registry=call_registry,
calling_module_patterns_to_exclude=calling_module_patterns_to_exclude,
access_type="w",
)
return _log_setattr
def __init__(self,
calling_module_patterns_to_exclude=None):
self.call_registry = collections.defaultdict( lambda: collections.defaultdict( lambda: collections.defaultdict(dict) ) )
if calling_module_patterns_to_exclude is None:
self.calling_module_patterns_to_exclude = set()
else:
self.calling_module_patterns_to_exclude = set(calling_module_patterns_to_exclude)
def log_class(self, class_ref):
class_ref.__getattribute__ = ImplicitInterfaceInference.def_getattribute(
call_registry=self.call_registry,
calling_module_patterns_to_exclude=self.calling_module_patterns_to_exclude,
)
class_ref.__setattr__ = ImplicitInterfaceInference.def_setattr(
call_registry=self.call_registry,
calling_module_patterns_to_exclude=self.calling_module_patterns_to_exclude,
)
def dump_registry_yaml(self,
out=sys.stdout,
verbosity=0,
):
out.write("---\n")
for class_name in sorted(self.call_registry.keys()):
out.write("{}:\n".format(class_name))
for callee_type in sorted(self.call_registry[class_name]):
out.write(" {}:\n".format(callee_type))
for midx, method_name in enumerate(sorted(self.call_registry[class_name][callee_type])):
if verbosity >= 2:
out.write(" {}:\n".format(method_name))
for cidx, caller_fqname in enumerate(sorted(self.call_registry[class_name][callee_type][method_name])):
if verbosity >= 3:
access_types = set([call_info.access_type for call_info in self.call_registry[class_name][callee_type][method_name][caller_fqname]])
access_info = " ({})".format("".join(access_types))
else:
access_info = ""
out.write(" - {}{}\n".format(caller_fqname, access_info))
elif verbosity >= 1:
access_types = set()
for cidx, (call_info_k, call_info_v) in enumerate(sorted(self.call_registry[class_name][callee_type][method_name].items())):
for item in call_info_v:
access_types.add(item.access_type)
out.write(" - {} ({})\n".format(method_name, "".join(access_types)))
else:
out.write(" - {}\n".format(method_name))
out.write("...\n")
if __name__ == "__main__":
import dendropy
from dendropy.datamodel import basemodel
from dendropy.utility import container
if len(sys.argv) > 4 or len(sys.argv) == 1:
sys.exit("usage: iii.py <path/to/treefile> <treefile-format> [verbosity]")
if len(sys.argv) >= 4:
verbosity = int(sys.argv[3])
else:
verbosity = 0
data_path = sys.argv[1]
schema = sys.argv[2]
calling_module_patterns_to_exclude = set([
re.compile(r"dendropy\.datamodel\.") # exclude calls from WITHIN the ``dendropy.datamodel`` hierarchy
])
i3 = ImplicitInterfaceInference(calling_module_patterns_to_exclude=calling_module_patterns_to_exclude)
i3.log_class(basemodel.DataObject)
i3.log_class(basemodel.MultiReadable)
i3.log_class(basemodel.Serializable)
i3.log_class(basemodel.Annotable)
i3.log_class(basemodel.Annotable)
i3.log_class(container.OrderedSet)
# i3.log_class(basemodel.AnnotationSet)
trees = dendropy.Tree.get(
path=data_path,
schema=schema)
i3.dump_registry_yaml(verbosity=verbosity)
---
Annotation:
[ATTRIBUTES]:
- annotations
AnnotationSet:
[METHODS]:
- add_new
Edge:
[ATTRIBUTES]:
- annotations
- label
- length
- tail_node
Node:
[ATTRIBUTES]:
- annotations
- edge
- label
- parent_node
- taxon
Taxon:
[ATTRIBUTES]:
- annotations
TaxonNamespace:
[ATTRIBUTES]:
- annotations
[METHODS]:
- new_taxon
Tree:
[ATTRIBUTES]:
- annotations
- is_rooted
- label
- length_type
- seed_node
[METHODS]:
- node_factory
TreeList:
[METHODS]:
- new_tree
...
---
Annotation:
[ATTRIBUTES]:
annotations:
- dendropy.dataio.nexmlreader.NexmlReader._parse_annotations (r)
AnnotationSet:
[METHODS]:
add_new:
- dendropy.dataio.nexmlreader.NexmlReader._parse_annotations (r)
Edge:
[ATTRIBUTES]:
annotations:
- dendropy.dataio.nexmlreader.NexmlReader._parse_annotations (r)
label:
- dendropy.dataio.nexmlreader._NexmlTreeParser._set_edge_details (w)
length:
- dendropy.dataio.nexmlreader._NexmlTreeParser._set_edge_details (w)
tail_node:
- dendropy.dataio.nexmlreader._NexmlTreeParser.build_tree (r)
Node:
[ATTRIBUTES]:
annotations:
- dendropy.dataio.nexmlreader.NexmlReader._parse_annotations (r)
edge:
- dendropy.dataio.nexmlreader._NexmlTreeParser.build_tree (r)
label:
- dendropy.dataio.nexmlreader._NexmlTreeParser._parse_nodes (w)
parent_node:
- dendropy.dataio.nexmlreader._NexmlTreeParser.build_tree (w)
taxon:
- dendropy.dataio.nexmlreader._NexmlTreeParser._parse_nodes (w)
Taxon:
[ATTRIBUTES]:
annotations:
- dendropy.dataio.nexmlreader.NexmlReader._parse_annotations (r)
TaxonNamespace:
[ATTRIBUTES]:
annotations:
- dendropy.dataio.nexmlreader.NexmlReader._parse_annotations (r)
[METHODS]:
new_taxon:
- dendropy.dataio.nexmlreader.NexmlReader._parse_taxon_namespaces (r)
Tree:
[ATTRIBUTES]:
annotations:
- dendropy.dataio.nexmlreader.NexmlReader._parse_annotations (r)
is_rooted:
- dendropy.dataio.nexmlreader._NexmlTreeParser.build_tree (w)
label:
- dendropy.dataio.nexmlreader._NexmlTreeParser.build_tree (w)
length_type:
- dendropy.dataio.nexmlreader._NexmlTreeParser.build_tree (rw)
seed_node:
- dendropy.dataio.nexmlreader._NexmlTreeParser.build_tree (rw)
[METHODS]:
node_factory:
- dendropy.dataio.nexmlreader._NexmlTreeParser.build_tree (r)
TreeList:
[METHODS]:
new_tree:
- dendropy.dataio.nexmlreader.NexmlReader._parse_tree_list (r)
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment