Created
September 4, 2019 16:01
-
-
Save yucer/494d51fc043ab82b177b59254f6ef2dc to your computer and use it in GitHub Desktop.
Find loops in Odoo models and addons with python's networkx
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import optparse | |
import erppeek | |
import networkx as nx | |
options = None | |
class DL: # DebugLevel constants | |
NONE = 0 | |
ERROR = 1 | |
WARN = 2 | |
INFO = 3 | |
DEFAULT = INFO | |
def info(msg, _level = None): | |
_level = _level or DL.DEFAULT | |
if options.debug_level >= _level: | |
if _level==DL.WARN: | |
msg = 'Warning: %s !\n' % msg | |
elif _level==DL.ERROR: | |
msg = 'Error: %s !\n' % msg | |
else: | |
msg = msg + '\n' | |
print(msg) | |
def node_id(obj): | |
#res = 'Module:%d' % obj.id | |
res = obj.name | |
return res | |
def get_all_modules(): | |
import itertools | |
return list(itertools.chain(*client.modules().values())) | |
def get_module_obj(name): | |
_objs = client.model('ir.module.module').browse([('name','=',name)]) | |
obj = _objs[0] if _objs and len(_objs) else False | |
return obj | |
def get_module_graph(): | |
G = nx.DiGraph() | |
info('importing modules') | |
all_modules = get_all_modules() | |
#import ipdb; ipdb.set_trace() | |
all_modules.sort() | |
for im_name in all_modules: | |
_m = client.model('ir.module.module').browse([('name','=',im_name)])[0] | |
G.add_node(node_id(_m), | |
id=_m.id, | |
name=_m.name, | |
installed_version=_m.installed_version, | |
state=_m.state) | |
info('importing module dependencies') | |
for im_name in all_modules: | |
m1 = get_module_obj(im_name) | |
for dep in m1.dependencies_id: | |
m2 = get_module_obj(dep.name) | |
if m2: | |
n1 = node_id(m1) | |
n2 = node_id(m2) | |
G.add_edge(n1, n2, rel="DEPENDS_ON") | |
else: | |
info('module %s depends of unexistent module %s' % (im_name, dep.name), | |
DL.WARN) | |
return G | |
def query_deps(G, node_type): | |
nx_graph = G.to_undirected() | |
comps= sorted(nx.connected_component_subgraphs(nx_graph), key=len, reverse=False) | |
for idx in range(len(comps)): | |
c = comps[idx] | |
info('\n#------- set #%d ---- %d %ss ------\n' % (idx, len(c), node_type) ) | |
cmp_digraph = G.subgraph(c) | |
nodes = nx.algorithms.dag.topological_sort(cmp_digraph, None, True) | |
print_node_table(G, nodes) | |
def query_module_deps(G, module, reverse=False): | |
import networkx.algorithms.traversal.depth_first_search as dfs | |
import networkx.algorithms.dag as topological_sort | |
info('module deps for module %s:' % module) | |
_m = client.model('ir.module.module').browse([('name','=',module)])[0] | |
source = node_id(_m) | |
_G = G.reverse(copy=True) if reverse else G | |
_nodes = list(dfs.dfs_postorder_nodes(_G, source)) | |
nodes = _nodes[::-1] if reverse else _nodes | |
print_node_table(_G, nodes) | |
def node_str(G, node): | |
v = G.node[node] | |
res = '%s (%s, %s)' % (v['name'], v['state'], v['installed_version']) | |
return res | |
def query_module_loops(G, node_type): | |
nx_graph = G.to_undirected() | |
idx = 0 | |
try: | |
for (u,v) in nx.find_cycle(G, source='base', orientation='reverse'): | |
print(" -> ".join([node_str(G, u), node_str(G, v)])) | |
except nx.exception.NetworkXNoCycle: | |
print("No cycle found !") | |
def print_node_table(G, nodes): | |
nodes = list(nodes) | |
if nodes: | |
max_len = max([len(G.node[node]['name']) for node in nodes]) | |
for node in nodes: | |
print_node(G, node, max_len) | |
else: | |
print('No results where found!') | |
def print_node(G, node, max_len=20): | |
g_node = G.node[node] | |
name_iv='%s (%s)' % (g_node['name'], g_node['installed_version']) | |
state=g_node['state'] | |
print('%s %s>%s' % (name_iv, '-'*(max_len+12-len(name_iv)), state)) | |
def main(): | |
global options | |
global client | |
p = optparse.OptionParser() | |
p.add_option('--debug-level', '-L', help='0 None, 1 Errors, 2 Warnings, 3 Info (default)', | |
type="int", default=DL.DEFAULT) | |
p.add_option('--env', '-e', help='erppeek enviroment section with cx info', default='DEFAULT') | |
p.add_option('--all-module-deps', '-A', help='list odoo module dependency components', | |
action="store_true", default=False) | |
p.add_option('--module-deps', '-M', help='query module odoo deps') | |
p.add_option('--deps-module', '-d', help='query which modules depends on the given one') | |
p.add_option('--module-dep-sets', '-S', help='query module dependency sets', | |
action="store_true", default=False) | |
p.add_option('--reverse', '-r', help='reverse the order of dependency sets (bigger first) ', | |
action="store_true", default=False) | |
p.add_option('--module-loops', '-O', help='query module dependency loops', | |
action="store_true", default=False) | |
options, arguments = p.parse_args() | |
info("erppeek connection='%s'" % options.env) | |
is_cmd = options.all_module_deps or options.module_deps | |
is_cmd = is_cmd or options.deps_module or options.module_loops | |
if is_cmd: | |
client=erppeek.Client.from_config(options.env) | |
g_modules=get_module_graph() | |
if options.all_module_deps: | |
query_deps(g_modules, 'module') | |
elif options.module_deps: | |
query_module_deps(g_modules, options.module_deps) | |
elif options.deps_module: | |
query_module_deps(g_modules, options.deps_module, reverse=True) | |
elif options.module_dep_sets: | |
query_module_deps(get_module_dep_sets) | |
elif options.module_loops: | |
query_module_loops(g_modules, 'module') | |
else: | |
p.print_help() | |
sys.exit(2) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The requires are:
odoorpc
erppeek>=1.6.1
py2neo==3.1.2