-
-
Save blaggacao/50f5b13f5b489071bec3029ec9a4f65c to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*- | |
# Part of Odoo. See LICENSE file for full copyright and licensing details. | |
import datetime | |
import imp | |
import json | |
import logging | |
import os | |
import re | |
import sys | |
import time | |
from collections import namedtuple | |
from contextlib import contextmanager | |
from functools import reduce | |
from inspect import currentframe | |
from itertools import chain, islice | |
from operator import itemgetter | |
from textwrap import dedent | |
import lxml | |
import markdown | |
import psycopg2 | |
from docutils.core import publish_string | |
from dodoo import odoo | |
from psycopg2 import sql | |
from odoo import SUPERUSER_ID, release | |
from odoo.api import Environment | |
from odoo.modules.module import get_module_path | |
from odoo.modules.registry import Registry | |
from odoo.sql_db import db_connect | |
from odoo.tools import UnquoteEvalContext | |
from odoo.tools.func import frame_codeinfo | |
from odoo.tools.mail import html_sanitize | |
try: | |
from odoo.addons.base.models.ir_module import MyWriter | |
except ImportError: | |
# Odoo < 12.0 | |
from odoo.addons.base.module.module import MyWriter | |
_logger = logging.getLogger(__name__) | |
_INSTALLED_MODULE_STATES = ("installed", "to install", "to upgrade") | |
IMD_FIELD_PATTERN = "field_%s__%s" | |
DROP_DEPRECATED_CUSTOM = os.getenv("OE_DROP_DEPRECATED_CUSTOM") | |
# migration environ, used to share data between scripts | |
ENVIRON = {} | |
class MigrationError(Exception): | |
pass | |
# Modules utilities | |
def modules_installed(cr, *modules): | |
"""Check if the provided modules are installed (or about to be).""" | |
assert modules | |
_query = sql.SQL( | |
""" | |
SELECT count(1) | |
FROM ir_module_module | |
WHERE name IN %(names)s | |
AND state IN %(states)s | |
""" | |
) | |
cr.execute(_query, dict(names=modules, states=_INSTALLED_MODULE_STATES)) | |
return cr.fetchone()[0] == len(modules) | |
def module_installed(cr, module): | |
"""Check if the provided module is installed (or about to be).""" | |
return modules_installed(cr, module) | |
def remove_module(cr, module): | |
"""Uninstall the module and delete all its xmlid entries. | |
Make sure to reassign records before calling this method. | |
""" | |
# NOTE: we cannot use the uninstall of module because the given | |
# module need to be currenctly installed and running as deletions | |
# are made using orm. | |
_query = sql.SQL( | |
""" | |
SELECT id | |
FROM ir_module_module WHERE name = %(module)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
mod_id, = cr.fetchone() or [None] | |
if not mod_id: | |
return | |
# delete constraints only owned by this module | |
_query = sql.SQL( | |
""" | |
SELECT name | |
FROM ir_model_constraint | |
GROUP BY name HAVING array_agg(module) = %(mod_ids)s | |
""" | |
) | |
cr.execute(_query, {"mod_ids": [mod_id]}) | |
constraints = tuple(vals[0] for vals in cr.fetchall()) | |
if constraints: | |
_query = sql.SQL( | |
""" | |
SELECT TABLE_NAME, | |
CONSTRAINT_NAME | |
FROM information_schema.table_constraints | |
WHERE CONSTRAINT_NAME IN %(constraints)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
for table, constraint in cr.fetchall(): | |
_params = { | |
"table": sql.Identifier(table), | |
"constraint": sql.Identifier(constraint), | |
} | |
_query = sql.SQL( | |
""" | |
ALTER TABLE {table} | |
DROP CONSTRAINT {constraint} | |
""" | |
).format(**_params) | |
cr.execute(_query) | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_model_constraint | |
WHERE module = %(mod_id)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
# delete data | |
model_ids, field_ids, view_ids, menu_ids = (), (), (), () | |
_query = sql.SQL( | |
""" | |
SELECT model, | |
array_agg(res_id) | |
FROM ir_model_data d | |
WHERE NOT EXISTS | |
(SELECT 1 | |
FROM ir_model_data | |
WHERE id != d.id | |
AND res_id = d.res_id | |
AND model = d.model | |
AND module != d.module) | |
AND module = %(module)s | |
AND model != 'ir.module.module' | |
GROUP BY model | |
""" | |
) | |
cr.execute(_query, locals()) | |
for model, res_ids in cr.fetchall(): | |
if model == "ir.model": | |
model_ids = tuple(res_ids) | |
elif model == "ir.model.fields": | |
field_ids = tuple(res_ids) | |
elif model == "ir.ui.view": | |
view_ids = tuple(res_ids) | |
elif model == "ir.ui.menu": | |
menu_ids = tuple(res_ids) | |
else: | |
table = table_of_model(cr, model) | |
if table_exists(cr, table): | |
_params = {"table": sql.Identifier(table)} | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM {table} | |
WHERE id IN %(res_ids)s | |
""" | |
).format(**_params) | |
cr.execute(_query, dict(res_ids=tuple(res_ids))) | |
for view_id in view_ids: | |
remove_view(cr, view_id=view_id, deactivate_custom=True, silent=True) | |
if menu_ids: | |
remove_menus(cr, menu_ids) | |
# remove relations | |
_query = sql.SQL( | |
""" | |
SELECT name | |
FROM ir_model_relation | |
GROUP BY name HAVING array_agg(module) = %(mod_ids)s | |
""" | |
) | |
cr.execute(_query, {"mod_ids": [mod_id]}) | |
relations = tuple(vals[0] for vals in cr.fetchall()) | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_model_relation | |
WHERE module = %(mod_id)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
if relations: | |
_query = sql.SQL( | |
""" | |
SELECT TABLE_NAME | |
FROM information_schema.tables | |
WHERE TABLE_NAME IN %(relations)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
# fmt: on | |
for (rel,) in cr.fetchall(): | |
_params = {"rel": sql.Identifier(rel)} | |
_query = sql.SQL( | |
""" | |
DROP TABLE {rel} CASCADE | |
""" | |
).format(**_params) | |
cr.execute(_query) | |
if model_ids: | |
_query = sql.SQL( | |
""" | |
SELECT model | |
FROM ir_model | |
WHERE id IN %(model_ids)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
for (model,) in cr.fetchall(): | |
remove_model(cr, model) | |
if field_ids: | |
_query = sql.SQL( | |
""" | |
SELECT model, | |
name | |
FROM ir_model_fields | |
WHERE id IN %(field_ids)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
for model, name in cr.fetchall(): | |
remove_field(cr, model, name) | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_model_data | |
WHERE model = 'ir.module.module' | |
AND res_id = %(mod_id)s; | |
DELETE | |
FROM ir_model_data | |
WHERE module = %(module)s; | |
DELETE | |
FROM ir_module_module | |
WHERE name = %(module)s; | |
DELETE | |
FROM ir_module_module_dependency | |
WHERE name = %(module)s; | |
""" | |
) | |
cr.execute(_query, locals()) | |
def rename_module(cr, old, new): | |
"""Rename a module. Yes, really.""" | |
mod_old = "module_" + old | |
mod_new = "module_" + new | |
_query = sql.SQL( | |
""" | |
UPDATE ir_module_module | |
SET name = %(new)s | |
WHERE name = %(old)s; | |
UPDATE ir_module_module_dependency | |
SET name = %(new)s | |
WHERE name = %(old)s; | |
UPDATE ir_model_data | |
SET module = %(new)s | |
WHERE module = %(old)s; | |
UPDATE ir_model_data | |
SET name = %(mod_new)s | |
WHERE name = %(mod_old)s | |
AND module = 'base' | |
AND model = 'ir.module.module'; | |
""" | |
) | |
cr.execute(_query, locals()) | |
def merge_module(cr, old, into, tolerant=False): | |
"""Move all references of module `old` into module `into`. | |
:param str old: source module to merge | |
:param str into: destination module to merge into | |
:tolerant bool: if True, the merge will be skipped if the database | |
does not have the `old` module in its ir_module_module | |
table (e.g. if the module was released after the | |
creation of the database) | |
""" | |
_query = sql.SQL( | |
""" | |
SELECT name, | |
id | |
FROM ir_module_module | |
WHERE name IN %(names)s | |
""" | |
) | |
cr.execute(_query, dict(names=(old, into))) | |
mod_ids = dict(cr.fetchall()) | |
if tolerant and old not in mod_ids: | |
# this can happen in case of temp modules added after a release if the database does not | |
# know about this module, i.e: account_full_reconcile in 9.0 | |
# `into` should be known; let it crash if not | |
_logger.warning("Unknow module %s. Skip merging into %s.", old, into) | |
return | |
def _update(table, old, new): | |
_query = sql.SQL( | |
""" | |
UPDATE ir_model_{table} x | |
SET module = %(new)s | |
WHERE module = %(old)s | |
AND NOT EXISTS | |
(SELECT 1 | |
FROM ir_model_{table} y | |
WHERE y.name = x.name | |
AND y.module = %(new)s) | |
""".format( | |
table=table | |
) | |
) | |
cr.execute(_query, locals()) | |
if table == "data": | |
_query = sql.SQL( | |
""" | |
SELECT model, | |
array_agg(res_id) | |
FROM ir_model_data | |
WHERE module = %(old)s | |
AND model NOT LIKE 'ir.model%%' | |
GROUP BY model | |
""" | |
) | |
cr.execute(_query, locals()) | |
for model, res_ids in cr.fetchall(): | |
# we can assume other records have been moved to xml files of the new module | |
# remove the unnecessary data and let the module update do its job | |
if model == "ir.ui.view": | |
for v in res_ids: | |
remove_view(cr, view_id=v, deactivate_custom=True, silent=True) | |
elif model == "ir.ui.menu": | |
remove_menus(cr, tuple(res_ids)) | |
else: | |
for r in res_ids: | |
remove_record(cr, (model, r)) | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_model_{table} | |
WHERE module = %(old)s | |
""".format( | |
table=table | |
) | |
) | |
cr.execute(_query, locals()) | |
_update("constraint", mod_ids[old], mod_ids[into]) | |
_update("relation", mod_ids[old], mod_ids[into]) | |
_update_view_key(cr, old, into) | |
_update("data", old, into) | |
# update dependencies | |
_query = sql.SQL( | |
""" | |
INSERT INTO ir_module_module_dependency(module_id, name) | |
SELECT module_id, | |
%(into)s | |
FROM ir_module_module_dependency d | |
WHERE name = %(old)s | |
AND NOT EXISTS | |
(SELECT 1 | |
FROM ir_module_module_dependency o | |
WHERE o.module_id = d.module_id | |
AND o.name = %(into)s); | |
DELETE | |
FROM ir_module_module | |
WHERE name = %(old)s; | |
DELETE | |
FROM ir_module_module_dependency | |
WHERE name = %(old)s; | |
""" | |
) | |
cr.execute(_query, locals()) | |
def force_install_module(cr, module, deps=None): | |
"""Force a module to be installed during the upgrade process. | |
:param str module: technical name of the module to install | |
:param list deps: if set, the module will be installed only if any of | |
module of this list is already installed | |
(or about to be) | |
""" | |
subquery = sql.SQL("") | |
if deps: | |
subquery = sql.SQL( | |
""" | |
AND EXISTS | |
(SELECT 1 | |
FROM ir_module_module | |
WHERE name IN %(deps)s | |
AND state IN %(states)s) | |
""" | |
) | |
deps = tuple(deps) | |
states = _INSTALLED_MODULE_STATES | |
_params = {"subquery": subquery} | |
_query = sql.SQL( | |
""" | |
WITH RECURSIVE deps (mod_id, dep_name) AS | |
(SELECT m.id, | |
d.name | |
FROM ir_module_module_dependency d | |
JOIN ir_module_module m ON (d.module_id = m.id) | |
WHERE m.name = %(module)s | |
UNION SELECT m.id, | |
d.name | |
FROM ir_module_module m | |
JOIN deps ON deps.dep_name = m.name | |
JOIN ir_module_module_dependency d ON (d.module_id = m.id)) | |
UPDATE ir_module_module m | |
SET state = CASE | |
WHEN state = 'to remove' THEN 'to upgrade' | |
WHEN state = 'uninstalled' THEN 'to install' | |
ELSE state | |
END, | |
demo= | |
(SELECT demo | |
FROM ir_module_module | |
WHERE name='base') | |
FROM deps d | |
WHERE m.id = d.mod_id {subquery} RETURNING m.name, | |
m.state; | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
state = dict(cr.fetchall()).get(module) | |
return state | |
def new_module_dep(cr, module, new_dep): | |
"""Add a new dependency to a module.""" | |
# One new dep at a time | |
_query = sql.SQL( | |
""" | |
INSERT INTO ir_module_module_dependency(name, module_id) | |
SELECT %(new_dep)s, | |
id | |
FROM ir_module_module m | |
WHERE name = %(module)s | |
AND NOT EXISTS | |
(SELECT 1 | |
FROM ir_module_module_dependency | |
WHERE module_id = m.id | |
AND name = %(new_dep)s); | |
""" | |
) | |
cr.execute(_query, locals()) | |
_query = sql.SQL( | |
""" | |
SELECT state | |
FROM ir_module_module | |
WHERE name = %(module)s; | |
""" | |
) | |
cr.execute(_query, locals()) | |
mod_state = (cr.fetchone() or ["n/a"])[0] | |
if mod_state in _INSTALLED_MODULE_STATES: | |
# Module was installed, need to install all its deps, recursively, | |
# to make sure the new dep is installed | |
force_install_module(cr, module) | |
def remove_module_deps(cr, module, old_deps): | |
"""Remove a dependencies from a module. | |
:param str module: name of the module whose dependencies are removed | |
:param tuple old_deps: list of dependencies to be removed | |
""" | |
assert isinstance(old_deps, tuple) | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_module_module_dependency | |
WHERE module_id = | |
(SELECT id | |
FROM ir_module_module | |
WHERE name = %(module)s) | |
AND name IN %(old_deps)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
def module_deps_diff(cr, module, add=(), remove=()): | |
"""Change the dependencies of a module (adding and removing). | |
:param str module: technical name of the module with dependency changes | |
:param list add: modules to add as dependencies | |
:param list remove: modules to remove as dependencies | |
""" | |
for new_dep in add: | |
new_module_dep(cr, module, new_dep) | |
if remove: | |
remove_module_deps(cr, module, tuple(remove)) | |
def new_module(cr, module, deps=(), auto_install=False): | |
"""Make a new module known to the database. | |
:param str module: technical name of the new module | |
:param list deps: modules to add as dependencies | |
:param bool auto_install: whether the module will auto install if all | |
its dependencies are installed | |
""" | |
_query = sql.SQL( | |
""" | |
SELECT count(1) | |
FROM ir_module_module | |
WHERE name = %(module)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
if cr.fetchone()[0]: | |
# Avoid duplicate entries for module which is already installed, | |
# even before it has become standard module in new version | |
# Also happen for modules added afterward, which should be added by multiple series. | |
return | |
if deps and auto_install: | |
state = "to install" if modules_installed(cr, *deps) else "uninstalled" | |
else: | |
state = "uninstalled" | |
_query = sql.SQL( | |
""" | |
INSERT INTO ir_module_module (name, STATE, demo) | |
VALUES (%(module)s, %(state)s, | |
(SELECT demo | |
FROM ir_module_module | |
WHERE name='base')) RETURNING id | |
""" | |
) | |
cr.execute(_query, locals()) | |
new_id, = cr.fetchone() | |
_query = sql.SQL( | |
""" | |
INSERT INTO ir_model_data (name, module, noupdate, model, res_id) | |
VALUES ('module_'||%(module)s, | |
'base', | |
't', | |
'ir.module.module', | |
%(new_id)s) | |
""" | |
) | |
cr.execute(_query, locals()) | |
for dep in deps: | |
new_module_dep(cr, module, dep) | |
def force_migration_of_fresh_module(cr, module): | |
"""Force migration scripts to be executed for new modules. | |
In some cases, new (or forced installed) modules need a migration script to grab data | |
form other modules. In that case, forcing its state to be 'to upgrade' will allow | |
migration scripts to be run even if the module was not installed before the migration. | |
""" | |
filename, _ = frame_codeinfo(currentframe(), 1) | |
version = ".".join(filename.split(os.path.sep)[-2].split(".")[:2]) | |
# Force module state to be in `to upgrade`. | |
# Needed for migration script execution. See http://git.io/vnF7f | |
_query = sql.SQL( | |
""" | |
UPDATE ir_module_module | |
SET state = 'to upgrade', | |
latest_version = %(version)s | |
WHERE name = %(module)s | |
AND state = 'to install' RETURNING id | |
""" | |
) | |
cr.execute(_query, locals()) | |
if cr.rowcount: | |
# Force module in `init` mode beside its state is forced to `to upgrade` | |
# See http://git.io/vnF7O | |
odoo.tools.config["init"][module] = "oh yeah!" | |
def split_group(cr, from_groups, to_group): | |
"""Make users members of all `from_groups` members of `to_group`""" | |
def check_group(g): | |
if isinstance(g, str): | |
gid = ref(cr, g) | |
if not gid: | |
_logger.warning("split_group(): Unknow group: %r", g) | |
return gid | |
return g | |
if not isinstance(from_groups, (list, tuple, set)): | |
from_groups = [from_groups] | |
from_groups = [g for g in map(check_group, from_groups) if g] | |
if not from_groups: | |
return | |
if isinstance(to_group, str): | |
to_group = ref(cr, to_group) | |
assert to_group | |
_query = sql.SQL( | |
""" | |
INSERT INTO res_groups_users_rel(uid, gid) | |
SELECT uid, | |
%(to_group)s | |
FROM res_groups_users_rel | |
GROUP BY uid HAVING array_agg(gid) @> %(from_groups)s EXCEPT | |
SELECT uid, | |
gid | |
FROM res_groups_users_rel | |
WHERE gid = %(to_group)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
# Models & Fields utilities | |
def create_m2m(cr, m2m, fk1, fk2, col1=None, col2=None): | |
"""Create a many2many relation table. | |
:param str m2m: relation table name | |
:param str fk1: table referenced by first column | |
:param str fk2: table referenced by second column | |
:param str col1: first column name | |
:param str col2: second column name | |
""" | |
if col1 is None: | |
col1 = "%s_id" % fk1 | |
if col2 is None: | |
col2 = "%s_id" % fk2 | |
_params = { | |
"m2m": sql.Identifier(m2m), | |
"col1": sql.Identifier(col1), | |
"fk1": sql.Identifier(fk1), | |
"col2": sql.Identifier(col2), | |
"fk2": sql.Identifier(fk2), | |
} | |
_query = sql.SQL( | |
""" | |
CREATE TABLE {m2m}({col1} integer NOT NULL REFERENCES {fk1}(id) ON | |
DELETE CASCADE, {col2} integer NOT NULL REFERENCES {fk2}(id) ON | |
DELETE CASCADE, | |
UNIQUE ({col1}, {col2})); | |
CREATE INDEX ON {m2m}({col1}); | |
CREATE INDEX ON {m2m}({col2}); | |
""" | |
).format(**_params) | |
cr.execute(_query) | |
def ensure_m2o_func_field_data(cr, src_table, column, dst_table): | |
"""Fix broken many2one relations. | |
If any `column` not present in `dst_table`, remove column from `src_table` in | |
order to force recomputation of the function field | |
WARN: only call this method on m2o function/related fields!! | |
""" | |
if not column_exists(cr, src_table, column): | |
return | |
_params = { | |
"src_table": sql.Identifier(src_table), | |
"column": sql.Identifier(column), | |
"dst_table": sql.Identifier(dst_table), | |
} | |
_query = sql.SQL( | |
""" | |
SELECT count(1) | |
FROM {src_table} | |
WHERE {column} NOT IN | |
(SELECT id | |
FROM {dst_table}) | |
""" | |
).format(**_params) | |
cr.execute(_query) | |
if cr.fetchone()[0]: | |
remove_column(cr, src_table, column, cascade=True) | |
def uniq_tags(cr, model, uniq_column="name", order="id"): | |
"""Deduplicate "tag" models entries. | |
Should only be referenced as many2many | |
By using `uniq_column=lower(name)` and `order=name` | |
you can prioritize tags in CamelCase/UPPERCASE. | |
""" | |
table = table_of_model(cr, model) | |
_upds = sql.SQL("") | |
for ft, fc, _, da in get_fk(cr, table): | |
assert da == "c" # should be a ondelete=cascade fk | |
cols = get_columns(cr, ft, ignore=(fc,))[0] | |
assert len(cols) == 1 # it's a m2, should have only 2 columns | |
_params = { | |
"rel": sql.Indentifier(ft), | |
"c1": sql.Identifier(cols[0]), | |
"c2": sql.Identifier(cols[1]), | |
} | |
_upds += sql.SQL( | |
""" | |
INSERT INTO {rel}({c1}, {c2}) | |
SELECT r.{c1}, d.id | |
FROM {rel} r | |
JOIN dups d ON (r.{c2} = ANY(d.others)) EXCEPT | |
SELECT r.{c1}, r.{c2} | |
FROM {rel} r | |
JOIN dups d ON (r.{c2} = d.id) | |
""" | |
).format(**_params) | |
assert _upds # if not m2m found, there is something wrong... | |
# TODO: There is an issue here, find out teleologically, what was meant. | |
updates = sql.SQL(", ".join("_upd_%s AS (%s)" % x for x in enumerate(_upds))) | |
_params = { | |
"table": sql.Identifier(table), | |
"uniq_column": sql.Identifier(uniq_column), | |
"updates": updates, | |
} | |
_query = sql.SQL( | |
""" | |
WITH dups AS | |
( SELECT (array_agg(id ORDER BY {order}))[1] AS id, | |
(array_agg(id ORDER BY {order}))[2:array_length(array_agg(id), 1)] AS others | |
FROM {table} | |
GROUP BY {uniq_column} HAVING count(id) > 1 ), _upd_imd AS | |
( UPDATE ir_model_data x | |
SET res_id = d.id | |
FROM dups d | |
WHERE x.model = %(model)s | |
AND x.res_id = ANY(d.others) ), {updates} | |
DELETE | |
FROM {table} | |
WHERE id IN ( | |
SELECT unnest(others) | |
FROM dups) | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
def remove_field(cr, model, fieldname, cascade=False): | |
"""Remove a field. | |
:param str model: name of the field's model | |
:param str fieldname: name of the field (...) | |
:param bool cascade: if True, all records having a FKEY pointing to this field | |
will be cascade-deleted (default: False) | |
""" | |
if fieldname == "id": | |
# called by `remove_module`. May happen when a model defined in a removed module was | |
# overwritten by another module in previous versions. | |
return remove_model(cr, model) | |
# clean dashboards' `group_by` | |
_query = sql.SQL( | |
""" | |
SELECT array_agg(f.name), | |
array_agg(aw.id) | |
FROM ir_model_fields f | |
JOIN ir_act_window aw ON aw.res_model = f.model | |
WHERE f.model = %s | |
AND f.name = %s | |
GROUP BY f.model | |
""" | |
) | |
cr.execute(_query, locals()) | |
for fields, actions in cr.fetchall(): | |
_query = sql.SQL( | |
""" | |
SELECT id, | |
arch | |
FROM ir_ui_view_custom | |
WHERE arch ~ %s | |
""" | |
) | |
cr.execute(_query, ["name=[\"'](%s)[\"']" % "|".join(map(str, actions))]) | |
for id, arch in ((x, lxml.etree.fromstring(y)) for x, y in cr.fetchall()): | |
for action in arch.iterfind(".//action"): | |
context = eval(action.get("context", "{}"), UnquoteEvalContext()) | |
if context.get("group_by"): | |
context["group_by"] = list(set(context["group_by"]) - set(fields)) | |
action.set("context", str(context)) | |
_query = sql.SQL( | |
""" | |
UPDATE ir_ui_view_custom | |
SET arch = %s | |
WHERE id = %s | |
""" | |
) | |
cr.execute(_query, [lxml.etree.tostring(arch, encoding="unicode"), id]) | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_model_fields | |
WHERE model=%(model)s | |
AND name=%(fieldname)s RETURNING id | |
""" | |
) | |
cr.execute(_query, locals()) | |
fids = tuple(vals[0] for vals in cr.fetchall()) | |
if fids: | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_model_data | |
WHERE model = 'ir.model.fields' | |
AND res_id IN %(fids)s | |
""" | |
) | |
cr.execute(_query, **locals()) | |
# cleanup translations | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_translation | |
WHERE name = %s | |
AND TYPE IN ('field', | |
'help', | |
'model', | |
'selection') -- ignore wizard_* translations | |
""" | |
) | |
cr.execute(_query, ["{},{}".format(model, fieldname)]) | |
table = table_of_model(cr, model) | |
remove_column(cr, table, fieldname, cascade=cascade) | |
def move_field_to_module(cr, model, fieldname, old_module, new_module): | |
"""Move a field to another module.""" | |
name = IMD_FIELD_PATTERN % (model.replace(".", "_"), fieldname) | |
_query = sql.SQL( | |
""" | |
UPDATE ir_model_data | |
SET module=%(new_module)s | |
WHERE model='ir.model.fields' | |
AND name=%(name)s | |
AND module=%(old_module)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
def rename_field(cr, model, old, new, update_references=True): | |
"""Rename a module. Yes, really. | |
:param bool update_references: if True, references to that field in | |
filters, saved exports, etc. will be | |
adapted (default: True) | |
""" | |
_query = sql.SQL( | |
""" | |
UPDATE ir_model_fields | |
SET name = %(new)s | |
WHERE model = %(model)s | |
AND name = %(old)s RETURNING id | |
""" | |
) | |
cr.execute(_query, locals()) | |
[fid] = cr.fetchone() or [None] | |
if fid: | |
name = IMD_FIELD_PATTERN % (model.replace(".", "_"), new) | |
_query = sql.SQL( | |
""" | |
UPDATE ir_model_data | |
SET name=%(name)s | |
WHERE model='ir.model.fields' | |
AND res_id=%(fid)s; | |
UPDATE ir_property | |
SET name=%(new)s | |
WHERE fields_id=%(fid)s; | |
""" | |
) | |
cr.execute(_query, locals()) | |
_query = sql.SQL( | |
""" | |
UPDATE ir_translation | |
SET name = %s | |
WHERE name = %s | |
AND TYPE IN ('field', | |
'help', | |
'model', | |
'selection') -- ignore wizard_* translations | |
""" | |
) | |
cr.execute(_query, ["{},{}".format(model, new), "{},{}".format(model, old)]) | |
table = table_of_model(cr, model) | |
# NOTE table_exists is needed to avoid altering views | |
if table_exists(cr, table) and column_exists(cr, table, old): | |
_params = { | |
"table": sql.Identifier(table), | |
"old": sql.Identifier(old), | |
"new": sql.Identifier(new), | |
} | |
_query = sql.SQL( | |
""" | |
ALTER TABLE {table} RENAME COLUMN {old} TO {new} | |
""" | |
).format(**_params) | |
cr.execute(_query) | |
if update_references: | |
update_field_references(cr, old, new, only_models=(model,)) | |
def make_field_company_dependent( | |
cr, | |
model, | |
field, | |
field_type, | |
target_model=None, | |
default_value=None, | |
default_value_ref=None, | |
company_field="company_id", | |
): | |
"""Convert a field to be company dependent (old `property` field attributes). | |
Notes: | |
`target_model` is only use when `type` is "many2one". | |
The `company_field` can be an sql expression. | |
You may use `t` to refer the model's table. | |
""" | |
type2field = { | |
"char": "value_text", | |
"float": "value_float", | |
"boolean": "value_integer", | |
"integer": "value_integer", | |
"text": "value_text", | |
"binary": "value_binary", | |
"many2one": "value_reference", | |
"date": "value_datetime", | |
"datetime": "value_datetime", | |
"selection": "value_text", | |
} | |
assert field_type in type2field | |
value_field = type2field[field_type] | |
_query = sql.SQL( | |
""" | |
SELECT id | |
FROM ir_model_fields | |
WHERE model = %(model)s | |
AND name = %(field)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
[fields_id] = cr.fetchone() | |
table = table_of_model(cr, model) | |
_params = { | |
"field": sql.Identifier(field), | |
"value_field": sql.Identifier(value_field), | |
"company_field": sql.Identifier(company_field), | |
"table": sql.Identifier(table), | |
"target_model_prefix": sql.Literal("{},".format(target_model)), | |
"model_prefix": sql.Literal("{},".format(model)), | |
} | |
if default_value is None: | |
where_clause = sql.SQL( | |
""" | |
{field} IS NOT NULL | |
""" | |
).format(**_params) | |
else: | |
where_clause = sql.SQL( | |
""" | |
{field} != %(default_value)s | |
""" | |
).format(**_params) | |
if field_type != "many2one": | |
value_select = sql.Identifier(field) | |
else: | |
# for m2o, the store value is a reference field, so in format `model,id` | |
value_select = sql.SQL( | |
""" | |
CONCAT({target_model_prefix}, {field}) | |
""" | |
).format(**_params) | |
_params = dict( | |
{"value_select": value_select, "where_clause": where_clause}, **_params | |
) | |
# TODO: remove me when anonimization module is removed | |
if is_field_anonymized(cr, model, field): | |
# if field is anonymized, we need to create a property for each record | |
where_clause = "true" | |
# and we need to unanonymize its values | |
ano_default_value = cr.mogrify("%s", [default_value]) | |
if field_type != "many2one": | |
ano_value_select = "%(value)s" | |
else: | |
ano_value_select = sql.SQL( | |
""" | |
CONCAT({target_model_prefix}, %(value)s) | |
""" | |
).format(**_params) | |
register_unanonymization_query( | |
cr, | |
model, | |
field, | |
""" | |
UPDATE ir_property | |
SET {value_field} = CASE WHEN %(value)s IS NULL THEN {ano_default_value} | |
ELSE {ano_value_select} END | |
WHERE res_id = CONCAT({model_prefix}, %(id)s) | |
AND name='{field}' | |
AND type='{field_type}' | |
AND fields_id={fields_id} | |
""".format( | |
**locals() | |
), | |
) | |
_query = sql.SQL( | |
""" | |
WITH cte AS | |
( SELECT CONCAT({model_prefix}, id) AS res_id, | |
{value_select} AS value, | |
({company_field})::integer AS company | |
FROM {table} t | |
WHERE {where_clause} ) | |
INSERT INTO ir_property(name, type, fields_id, company_id, res_id, {value_field}) | |
SELECT %(field)s, | |
%(field_type)s, | |
%(fields_id)s, | |
cte.company, | |
cte.res_id, | |
cte.value | |
FROM cte | |
WHERE NOT EXISTS | |
(SELECT 1 | |
FROM ir_property | |
WHERE fields_id = %(fields_id)s | |
AND COALESCE(company_id, 0) = COALESCE(cte.company, 0) | |
AND res_id=cte.res_id) | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
# default property | |
if default_value: | |
_query = sql.SQL( | |
""" | |
INSERT INTO ir_property(name, type, fields_id, {value_field}) | |
VALUES (%(field)s, | |
%(field_type)s, | |
%(fields_id)s, | |
%(default_value)s) RETURNING id | |
""" | |
) | |
cr.execute(_query, locals()) | |
[prop_id] = cr.fetchone() | |
if default_value_ref: | |
module, _, xid = default_value_ref.partition(".") | |
_query = sql.SQL( | |
""" | |
INSERT INTO ir_model_data (module, name, model, res_id, noupdate) | |
VALUES (%(module)s, | |
%(xid)s, | |
'ir.property', | |
%(prop_id)s, | |
TRUE) | |
""" | |
) | |
cr.execute(_query, locals()) | |
remove_column(cr, table, field, cascade=True) | |
def is_field_anonymized(cr, model, field): | |
"""Check if a field has been anonymized prior to the migration.""" | |
if not module_installed(cr, "anonymization"): | |
return False | |
_query = sql.SQL( | |
""" | |
SELECT id | |
FROM ir_model_fields_anonymization | |
WHERE model_name = %(model)s | |
AND field_name = %(field)s | |
AND STATE = 'anonymized' | |
""" | |
) | |
cr.execute(_query, locals()) | |
return bool(cr.rowcount) | |
def update_field_references(cr, old, new, only_models=None): | |
"""Replaces references to a field in several crash-prone places. | |
Replace all references to field `old` to `new` in: | |
- ir_filters | |
- ir_exports_line | |
- ir_act_server | |
- ir_rule | |
- mail.mass_mailing | |
:param list only_models: list of models affected by the fieldname change | |
""" | |
p = { | |
"old": r"\y{}\y".format(old), | |
"new": new, | |
"def_old": r"\ydefault_{}\y".format(old), | |
"def_new": "default_{}".format(new), | |
"models": tuple(only_models) if only_models else (), | |
} | |
_query = sql.SQL( | |
""" | |
UPDATE ir_filters | |
SET domain = regexp_replace(domain, %(old)s, %(new)s, 'g'), | |
context = regexp_replace(regexp_replace(context, %(old)s, %(new)s, 'g'), %(def_old)s, %(def_new)s, 'g') | |
""" | |
) | |
if column_exists(cr, "ir_filters", "sort"): | |
_query += sql.SQL(", sort = regexp_replace(sort, %(old)s, %(new)s, 'g')") | |
if only_models: | |
_query += sql.SQL(" WHERE model_id IN %(models)s") | |
cr.execute(_query, p) | |
# ir.exports.line | |
_query = sql.SQL( | |
""" | |
UPDATE ir_exports_line l | |
SET name = regexp_replace(l.name, %(old)s, %(new)s, 'g') | |
""" | |
) | |
if only_models: | |
_query += sql.SQL( | |
""" | |
FROM ir_exports e | |
WHERE e.id = l.export_id | |
AND e.resource IN %(models)s | |
""" | |
) | |
cr.execute(_query, p) | |
# ir.action.server | |
col_prefix = "" | |
if not column_exists(cr, "ir_act_server", "condition"): | |
col_prefix = "--" # sql comment the line | |
_query = sql.SQL( | |
""" | |
UPDATE ir_act_server s | |
SET {col_prefix} condition = regexp_replace(condition, %(old)s, %(new)s, 'g'), | |
code = regexp_replace(code, %(old)s, %(new)s, 'g') | |
""".format( | |
col_prefix=col_prefix | |
) | |
) | |
if only_models: | |
_query += sql.SQL( | |
""" | |
FROM ir_model m | |
WHERE m.id = s.model_id | |
AND m.model IN %(models)s | |
AND | |
""" | |
) | |
else: | |
_query += sql.SQL(" WHERE ") | |
_query += sql.SQL("s.state = 'code'") | |
cr.execute(_query, p) | |
# ir.rule | |
_query = sql.SQL( | |
""" | |
UPDATE ir_rule r | |
SET domain_force = regexp_replace(domain_force, %(old)s, %(new)s, 'g') | |
""" | |
) | |
if only_models: | |
_query += sql.SQL( | |
""" | |
FROM ir_model m | |
WHERE m.id = r.model_id | |
AND m.model IN %(models)s | |
""" | |
) | |
cr.execute(_query, p) | |
# mass mailing | |
if column_exists(cr, "mail_mass_mailing", "mailing_domain"): | |
_query = sql.SQL( | |
""" | |
UPDATE mail_mass_mailing u | |
SET mailing_domain = regexp_replace(u.mailing_domain, %(old)s, %(new)s, 'g') | |
""" | |
) | |
if only_models: | |
if column_exists(cr, "mail_mass_mailing", "mailing_model_id"): | |
_query += sql.SQL( | |
""" | |
FROM ir_model m | |
WHERE m.id = u.mailing_model_id | |
AND m.model IN %(models)s | |
""" | |
) | |
else: | |
_query += sql.SQL("WHERE u.mailing_model IN %(models)s") | |
cr.execute(_query, p) | |
def recompute_fields(cr, model, fields, ids=None, logger=_logger, chunk_size=256): | |
"""Recompute fields using the ORM. | |
:param str model: | |
:param list fields: | |
:param list ids: list of ids of records on which the recompute will be done; | |
if not set, will recompute for all records | |
:param logger: logger used during the processing (default: current logger) | |
:param integer chunk_size: batch size for recomputing (default: 256) | |
""" | |
if ids is None: | |
_query = sql.SQL( | |
""" | |
SELECT id FROM "%(table)s" | |
""" | |
) | |
cr.execute(_query, dict(table=table_of_model(cr, model))) | |
ids = tuple(map(itemgetter(0), cr.fetchall())) | |
Model = env(cr)[model] | |
size = (len(ids) + chunk_size - 1) / chunk_size | |
qual = "%s %d-bucket" % (model, chunk_size) if chunk_size != 1 else model | |
for subids in log_progress( | |
chunks(ids, chunk_size, list), qualifier=qual, logger=logger, size=size | |
): | |
records = Model.browse(subids) | |
for field in fields: | |
records._recompute_todo(records._fields[field]) | |
records.recompute() | |
records.invalidate_cache() | |
def fix_wrong_m2o(cr, table, column, target, value=None): | |
"""Fix missing foreign keys references. | |
:param str table: table to correct | |
:param str column: column to correct | |
:param str target: destination table of the FKEY | |
:param value: value to set instead of the missing foreign key reference | |
that will be parsed by psycopg2 (variable type) | |
(default: None - NULL) | |
""" | |
_params = { | |
"table": sql.Identifier(table), | |
"target": sql.Identifier(target), | |
"column": sql.Identifier(column), | |
} | |
_query = sql.SQL( | |
""" | |
WITH wrongs_m2o AS | |
( SELECT s.id | |
FROM {table} s | |
LEFT JOIN {target} t ON s.{column} = t.id | |
WHERE s.{column} IS NOT NULL | |
AND t.id IS NULL ) | |
UPDATE {table} s | |
SET {column} = %(value)s | |
FROM wrongs_m2o w | |
WHERE s.id = w.id | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
def remove_model(cr, model, drop_table=True): | |
"""Remove a model 😉.""" | |
model_underscore = model.replace(".", "_") | |
# remove references | |
for ir in indirect_references(cr): | |
if ir.table == "ir_model": | |
continue | |
_params = { | |
"table": sql.Identifier(ir.table), | |
"where_clause": sql.SQL(ir.model_filter(placeholder="%(model)s")), | |
} | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM {table} | |
WHERE {where_clause} RETURNING id | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
ids = tuple(vals[0] for vals in cr.fetchall()) | |
remove_refs(cr, model_of_table(cr, ir.table), ids) | |
remove_refs(cr, model) | |
_query = sql.SQL( | |
""" | |
SELECT id | |
FROM ir_model | |
WHERE model = %(model)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
[mod_id] = cr.fetchone() or [None] | |
if mod_id: | |
# some required fk are "ON DELETE SET NULL". | |
for tbl in "base_action_rule google_drive_config".split(): | |
if column_exists(cr, tbl, "model_id"): | |
_params = {"tbl": sql.Identifier(tbl)} | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM {tbl} | |
WHERE model_id = %(mod_id)s | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_model_constraint | |
WHERE model=%(mod_id)s; | |
DELETE | |
FROM ir_model_relation | |
WHERE model=%(mod_id)s; | |
--- Drop XML IDs of ir.rule and ir.model.access records that will be cascade-dropped, | |
--- when the ir.model record is dropped - just in case they need to be re-created | |
DELETE | |
FROM ir_model_data x USING ir_rule a | |
WHERE x.res_id = a.id | |
AND x.model='ir.rule' | |
AND a.model_id = %(mod_id)s; | |
DELETE | |
FROM ir_model_data x USING ir_model_access a | |
WHERE x.res_id = a.id | |
AND x.model='ir.model.access' | |
AND a.model_id = %(mod_id)s; | |
DELETE | |
FROM ir_model | |
WHERE id=%(mod_id)s; | |
""" | |
) | |
cr.execute(_query, locals()) | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_model_data | |
WHERE model='ir.model' | |
AND name = %(name)s; | |
DELETE | |
FROM ir_model_data | |
WHERE model='ir.model.fields' | |
AND name LIKE %(name_like)s; | |
""" | |
) | |
cr.execute( | |
_query, | |
dict( | |
name="model_{}".format(model_underscore), | |
name_like=(IMD_FIELD_PATTERN % (model_underscore, "%")).replace("_", r"\_"), | |
), | |
) | |
table = table_of_model(cr, model) | |
_params = {"table": sql.Identifier(table)} | |
if drop_table: | |
if table_exists(cr, table): | |
_query = sql.SQL( | |
""" | |
DROP TABLE {table} CASCADE | |
""" | |
).format(**_params) | |
cr.execute(_query) | |
elif view_exists(cr, table): | |
# For auto=False models... | |
_query = sql.SQL( | |
""" | |
DROP VIEW {table} CASCADE | |
""" | |
).format(**_params) | |
cr.execute(_query) | |
def remove_refs(cr, model, ids=None): | |
"""Remove non-sql enforced references pointing to the specified model. | |
e.g. reference fields, translations, ... | |
""" | |
if ids is None: | |
match = sql.SQL("like %(needle)s") | |
needle = model + ",%" | |
else: | |
if not ids: | |
return | |
match = sql.SQL("in %(needle)s") | |
needle = tuple("{},{}".format(model, i) for i in ids) | |
# "model-comma" fields | |
_query = sql.SQL( | |
""" | |
SELECT model, | |
name | |
FROM ir_model_fields | |
WHERE ttype = 'reference' | |
UNION | |
SELECT 'ir.translation', | |
'name' | |
""" | |
) | |
cr.execute(_query) | |
for ref_model, ref_column in cr.fetchall(): | |
table = table_of_model(cr, ref_model) | |
# NOTE table_exists is needed to avoid deleting from views | |
if table_exists(cr, table) and column_exists(cr, table, ref_column): | |
_params = { | |
"table": sql.Identifier(table), | |
"ref_column": sql.Identifier(ref_column), | |
"match": match, | |
} | |
query_tail = sql.SQL( | |
""" | |
FROM {table} | |
WHERE {ref_column} {match} | |
""" | |
).format(**_params) | |
if ref_model == "ir.ui.view": | |
_query = sql.SQL("""SELECT id """) + query_tail | |
cr.execute(_query, locals()) | |
for (view_id,) in cr.fetchall(): | |
remove_view( | |
cr, view_id=view_id, deactivate_custom=True, silent=True | |
) | |
elif ref_model == "ir.ui.menu": | |
_query = sql.SQL("""SELECT id """) + query_tail | |
cr.execute(_query, locals()) | |
menu_ids = tuple(m[0] for m in cr.fetchall()) | |
remove_menus(cr, menu_ids) | |
else: | |
_query = sql.SQL("""DELETE """) + query_tail | |
cr.execute(_query, locals()) | |
# TODO make it recursive? | |
if table_exists(cr, "ir_values"): | |
column, _ = _ir_values_value(cr) | |
_params = {"column": sql.SQL(column), "match": match} | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_values | |
WHERE {column} {match} | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
if ids is None: | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_translation | |
WHERE name = %(model)s | |
AND TYPE IN ('constraint', | |
'sql_constraint', | |
'view', | |
'report', | |
'rml', | |
'xsl') | |
""" | |
) | |
cr.execute(_query, locals()) | |
def move_model(cr, model, from_module, to_module, move_data=False, delete=False): | |
"""Move model `model` from `from_module` to `to_module`. | |
:param bool move_data: reassign all xmlids from the `from_module` | |
referencing `model` records to `to_module` | |
(default: False) | |
:param bool delete: if True and `to_module` is not installed, | |
delete the model (default: False) | |
""" | |
if delete and not module_installed(cr, to_module): | |
remove_model(cr, model) | |
return | |
model_underscore = model.replace(".", "_") | |
name = "model_%s" % model_underscore | |
name_like = (IMD_FIELD_PATTERN % (model_underscore, "%")).replace("_", r"\_") | |
_query = sql.SQL( | |
""" | |
UPDATE ir_model_data | |
SET module = %(to_module)s | |
WHERE module = %(from_module)s | |
AND model = 'ir.model' | |
AND name = %(name)s; | |
UPDATE ir_model_data | |
SET module = %(to_module)s | |
WHERE module = %(from_module)s | |
AND model='ir.model.fields' | |
AND name LIKE %(name)s; | |
""" | |
) | |
cr.execute(_query, locals()) | |
if move_data: | |
_query = sql.SQL( | |
""" | |
UPDATE ir_model_data | |
SET module = %(to_module)s | |
WHERE module = %(from_module)s | |
AND model = %(model)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
def rename_model(cr, old, new, rename_table=True): | |
"""Rename a model. | |
:param bool rename_table: if True, the table will be renamed according | |
to the Odoo ORM (e.g. `ir.rule` -> `ir_rule`) | |
""" | |
if rename_table: | |
old_table = table_of_model(cr, old) | |
new_table = table_of_model(cr, new) | |
_params = { | |
"old_table": sql.Identifier(old_table), | |
"new_table": sql.Identifier(new_table), | |
"old_table_seq_id": sql.Identifier("{}_id_seq".format(old_table)), | |
"new_table_sqe_id": sql.Identifier("{}_id_seq".format(new_table)), | |
} | |
_query = sql.SQL( | |
""" | |
ALTER TABLE {old_table} RENAME TO {new_table}; | |
ALTER SEQUENCE {old_table_seq_id} RENAME TO {new_table_sqe_id}; | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
# find & rename primary key, may still use an old name from a former migration | |
_query = sql.SQL( | |
""" | |
SELECT conname | |
FROM pg_index, | |
pg_constraint | |
WHERE indrelid = %(new_table)s::regclass | |
AND indisprimary | |
AND conrelid = indrelid | |
AND conindid = indexrelid | |
AND confrelid = 0; | |
""" | |
) | |
cr.execute(_query, locals()) | |
primary_key, = cr.fetchone() | |
_params = dict( | |
{ | |
"primary_key": sql.Identifier(primary_key), | |
"new_table_pkey": sql.Identifier("{}_pkey".format(new_table)), | |
}, | |
**_params | |
) | |
_query = sql.SQL( | |
""" | |
ALTER INDEX {primary_key} RENAME TO {new_table_pkey} | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
# DELETE all constraints and indexes (ignore the PK), ORM will recreate them. | |
_query = sql.SQL( | |
""" | |
SELECT CONSTRAINT_NAME | |
FROM information_schema.table_constraints | |
WHERE TABLE_NAME = %(new_table)s | |
AND constraint_type != 'PRIMARY KEY' | |
AND constraint_name !~ '^[0-9_]+_not_null$' | |
""" | |
) | |
cr.execute(_query, locals()) | |
for (constrain,) in cr.fetchall(): | |
_params = { | |
"constrain": sql.Identifier(constrain), | |
"new_table": sql.Identifier(new_table), | |
} | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_model_constraint | |
WHERE name = %(constrain)s; | |
ALTER TABLE {new_table} | |
DROP CONSTRAINT {constrain}; | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
updates = [r[:2] for r in res_model_res_id(cr)] | |
for model, column in updates: | |
table = table_of_model(cr, model) | |
_params = {"table": sql.Identifier(table), "column": sql.Identifier(column)} | |
_query = sql.SQL( | |
""" | |
UPDATE {table} | |
SET {column} = %(new)s | |
WHERE {column} = %(old)s | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
# "model-comma" fields | |
_query = sql.SQL( | |
""" | |
SELECT model, | |
name | |
FROM ir_model_fields | |
WHERE ttype = 'reference' | |
UNION | |
SELECT 'ir.translation', | |
'name' | |
""" | |
) | |
cr.execute(_query, locals()) | |
old_like = "{},%".format(old) | |
substr_from = len(old) | |
for model, column in cr.fetchall(): | |
table = table_of_model(cr, model) | |
if column_exists(cr, table, column): | |
_params = {"table": sql.Identifier(table), "column": sql.Identifier(column)} | |
_query = sql.SQL( | |
""" | |
UPDATE {table} | |
SET {column} = %(new)s || substring({column} FROM %(substr_from)s) | |
WHERE {column} LIKE %(old_like)s | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
if table_exists(cr, "ir_values"): | |
column_read, cast_write = _ir_values_value(cr) | |
_params = { | |
"cast0": sql.SQL(cast_write.partition("%s")[0]), | |
"cast2": sql.SQL(cast_write.partition("%s")[2]), | |
"column": sql.SQL(column_read), | |
} | |
_query = sql.SQL( | |
""" | |
UPDATE ir_values | |
SET value = {cast0}%(new)s || substring({column} FROM %(substr_from)s){cast2} | |
WHERE {column} LIKE %(old_like)s | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
old_underscore = old.replace(".", "_") | |
new_underscore = new.replace(".", "_") | |
new_name = "model_%s" % new_underscore | |
old_name = "model_%s" % old_underscore | |
new_field_name = "field_%s" % new_underscore | |
old_field_prefix_length = len(old_underscore) + 6 | |
name_like = (IMD_FIELD_PATTERN % (old_underscore, "%")).replace("_", r"\_") | |
_query = sql.SQL( | |
""" | |
UPDATE ir_translation | |
SET name = %(new)s | |
WHERE name = %(old)s | |
AND TYPE IN ('constraint', | |
'sql_constraint', | |
'view', | |
'report', | |
'rml', | |
'xsl'); | |
UPDATE ir_model_data | |
SET name = %(new_name)s | |
WHERE model = 'ir.model' | |
AND name = %(old_name)s; | |
UPDATE ir_model_data | |
SET name = %(new_field_name)s || substring(name FROM %(old_field_prefix_length)s) | |
WHERE model = 'ir.model.fields' | |
AND name LIKE %(name_like)s; | |
""" | |
) | |
cr.execute(_query, locals()) | |
col_prefix = "" | |
if not column_exists(cr, "ir_act_server", "condition"): | |
col_prefix = "--" # sql comment the line | |
_params = { | |
"col_prefix": sql.SQL(col_prefix), | |
"old": sql.SQL(old), | |
"new": sql.SQL(new), | |
} | |
_query = sql.SQL( | |
r""" | |
UPDATE ir_act_server | |
-- regex matching model name wrapped by quotes eg in env['model.name'] or env["model.name"] | |
SET {col_prefix} condition = regexp_replace(condition, '([''"]){old}\1', '\1{new}\1', 'g'), | |
code = regexp_replace(code, '([''"]){old}\1', '\1{new}\1', 'g') | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
def replace_record_references(cr, old, new, replace_xmlid=True): | |
"""Replace all (in)direct references of a record by another""" | |
assert isinstance(old, tuple) and len(old) == 2 | |
assert isinstance(new, tuple) and len(new) == 2 | |
if not old[1]: | |
return | |
return replace_record_references_batch( | |
cr, {old[1]: new[1]}, old[0], new[0], replace_xmlid | |
) | |
def replace_record_references_batch( | |
cr, id_mapping, model_src, model_dst=None, replace_xmlid=True | |
): | |
assert id_mapping | |
assert all(isinstance(v, int) and isinstance(k, int) for k, v in id_mapping.items()) | |
if model_dst is None: | |
model_dst = model_src | |
old = tuple(id_mapping.keys()) | |
new = tuple(id_mapping.values()) | |
jmap = json.dumps(id_mapping) | |
def genmap(fmt_k, fmt_v=None): | |
# generate map using given format | |
fmt_v = fmt_k if fmt_v is None else fmt_v | |
m = {fmt_k % k: fmt_v % v for k, v in id_mapping.items()} | |
return json.dumps(m), tuple(m.keys()) | |
if model_src == model_dst: | |
# 7 time faster than using pickle.dumps | |
pmap, pmap_keys = genmap("I%d\n.") | |
smap, smap_keys = genmap("%d") | |
column_read, cast_write = _ir_values_value(cr) | |
for table, fk, _, _ in get_fk(cr, table_of_model(cr, model_src)): | |
_params = {"table": sql.Identifier(table), "fk": sql.Identifier(fk)} | |
jmap_query = sql.SQL( | |
""" | |
UPDATE {table} t | |
SET {fk} = (%(jmap)s::json->>{fk}::varchar)::int4 | |
WHERE {fk} IN %(old)s | |
""" | |
).format(**_params) | |
col2 = None | |
if not column_exists(cr, table, "id"): | |
# seems to be a m2m table. Avoid duplicated entries | |
cols = get_columns(cr, table, ignore=(fk,))[0] | |
assert len(cols) == 1 # it's a m2, should have only 2 columns | |
col2 = cols[0] | |
_params = { | |
"table": sql.Identifier(table), | |
"fk": sql.Identifier(fk), | |
"col2": sql.Identifier(col2), | |
"jmap_query": jmap_query, | |
} | |
_query = sql.SQL( | |
""" | |
WITH _existing AS | |
( SELECT {col2} | |
FROM {table} | |
WHERE {fk} IN %(new)s ) {jmap_query} | |
AND NOT EXISTS | |
(SELECT 1 | |
FROM _existing | |
WHERE {col2}=t.{col2}); | |
DELETE | |
FROM {table} WHERE {fk} IN %(old)s; | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
if not col2: # it's a model | |
# update default values | |
# TODO? update all defaults using 1 query (using `WHERE (model, name) IN ...`) | |
model = model_of_table(cr, table) | |
if table_exists(cr, "ir_values"): | |
_params = { | |
"cast0": sql.SQL(cast_write.partition("%s")[0]), | |
"cast2": sql.SQL(cast_write.partition("%s")[2]), | |
"column": sql.Identifier(column_read), | |
} | |
_query = sql.SQL( | |
""" | |
UPDATE ir_values | |
SET value = {cast0} %(pmap)s::json->>({column}) {cast2} | |
WHERE KEY='default' | |
AND model = %(model)s | |
AND name = %(fk)s | |
AND {column} IN %(pmap_keys)s | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
else: | |
_query = sql.SQL( | |
""" | |
UPDATE ir_default d | |
SET json_value = %(smap)s::json->>json_value | |
FROM ir_model_fields f | |
WHERE f.id = d.field_id | |
AND model = %(model)s | |
AND name = %(fk)s | |
AND d.json_value IN %(pmap_keys)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
# indirect references | |
for ir in indirect_references(cr, bound_only=True): | |
if ir.table == "ir_model_data" and not replace_xmlid: | |
continue | |
_params = {} | |
upd = sql.SQL("") | |
if ir.res_model: | |
_params["model"] = sql.Identifier(ir.res_model) | |
upd += sql.SQL("{model} = %(model_dst)s,").format(**_params) | |
if ir.res_model_id: | |
_params["model_id"] = sql.Identifier(ir.res_model_id) | |
upd += sql.SQL( | |
"{model_id} = (SELECT id FROM ir_model WHERE model = %(model_dst)s)," | |
).format(**_params) | |
where = sql.SQL(ir.model_filter(placeholder="%(model_src)s")) | |
_params = dict( | |
{ | |
"table": sql.Identifier(ir.table), | |
"res_id": sql.Identifier(ir.res_id), | |
"upd": upd, | |
"where": where, | |
}, | |
**_params | |
) | |
_query = sql.SQL( | |
""" | |
UPDATE {table} | |
SET {upd} {res_id} = (%(jmap)s::json->>{res_id}::varchar)::int4 | |
WHERE {where} | |
AND {res_id} IN %(old)s | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
# reference fields | |
cmap, cmap_keys = genmap("%s,%%d" % model_src, "%s,%%d" % model_dst) | |
cr.execute("SELECT model, name FROM ir_model_fields WHERE ttype='reference'") | |
for model, column in cr.fetchall(): | |
table = table_of_model(cr, model) | |
if column_exists(cr, table, column): | |
_params = {"table": sql.Identifier(table), "column": sql.Identifier(column)} | |
_query = sql.SQL( | |
""" | |
UPDATE {table} | |
SET {column} = %(cmap)s::json->>{column} | |
WHERE {column} IN %(cmap_keys)s | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
# ---------- UI Utilities (Views, Menus) ---------- | |
def _update_view_key(cr, old, new): | |
"""Update the key of a view.""" | |
if not column_exists(cr, "ir_ui_view", "key"): | |
return | |
_query = sql.SQL( | |
""" | |
UPDATE ir_ui_view v | |
SET key = CONCAT(%(new)s, '.', x.name) | |
FROM ir_model_data x | |
WHERE x.model = 'ir.ui.view' | |
AND x.res_id = v.id | |
AND x.module = %(old)s | |
AND v.key = CONCAT(x.module, '.', x.name) | |
""" | |
) | |
cr.execute(_query, locals()) | |
def remove_view( | |
cr, | |
xml_id=None, | |
view_id=None, | |
deactivate_custom=DROP_DEPRECATED_CUSTOM, | |
silent=False, | |
): | |
""" | |
Recursively delete the given view and its inherited views. | |
Delete the view and its inherited views as long as they are part of a module. | |
Will crash as soon as a custom view exists anywhere | |
in the hierarchy. | |
:param xml_id=None: fully qualified xmlid of the view | |
:param view_id=None: id of the view | |
:param deactivate_custom=False: if set, any custom view inheriting from | |
any of the deleted views will be | |
deactivated, otherwise a MigrationError | |
will be raised if a custom view exists; | |
can be set by the system environment | |
variable OE_DROP_DEPRECATED_CUSTOM | |
:param silent=False: if True, no log output will be generated | |
Note that you can either provide an xml_id or view_id but not both. | |
""" | |
assert bool(xml_id) ^ bool(view_id), "You Must specify either xmlid or view_id" | |
if xml_id: | |
view_id = ref(cr, xml_id) | |
if not view_id: | |
return | |
module, _, name = xml_id.partition(".") | |
_query = sql.SQL( | |
""" | |
SELECT model | |
FROM ir_model_data | |
WHERE module = %(module)s | |
AND name = %(name)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
[model] = cr.fetchone() | |
if model != "ir.ui.view": | |
raise ValueError( | |
"{!r} should point to a 'ir.ui.view', not a {!r}".format(xml_id, model) | |
) | |
elif not silent or deactivate_custom: | |
# search matching xmlid for logging or renaming of custom views | |
_query = sql.SQL( | |
""" | |
SELECT module, | |
name | |
FROM ir_model_data | |
WHERE model='ir.ui.view' | |
AND res_id = %(view_id)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
if cr.rowcount: | |
xml_id = "%s.%s" % cr.fetchone() | |
else: | |
xml_id = "?" | |
_query = sql.SQL( | |
""" | |
SELECT v.id, | |
x.module || '.' || x.name | |
FROM ir_ui_view v | |
LEFT JOIN ir_model_data x ON ( v.id = x.res_id | |
AND x.model = 'ir.ui.view' | |
AND x.module !~ '^_' ) | |
WHERE v.inherit_id = %(view_id)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
for child_id, child_xml_id in cr.fetchall(): | |
if child_xml_id: | |
if not silent: | |
_logger.info( | |
"Dropping deprecated built-in view %s (ID %s), " | |
"as parent %s (ID %s) is going to be removed", | |
child_xml_id, | |
child_id, | |
xml_id, | |
view_id, | |
) | |
remove_view( | |
cr, child_xml_id, deactivate_custom=deactivate_custom, silent=True | |
) | |
else: | |
if deactivate_custom: | |
if not silent: | |
_logger.warning( | |
"Deactivating deprecated custom view with " | |
"ID %s, as parent %s (ID %s) was removed", | |
child_id, | |
xml_id, | |
view_id, | |
) | |
disable_view_query = sql.SQL( | |
""" | |
UPDATE ir_ui_view | |
SET name = (name || ' - old view, inherited from ' || %(xml_id)s), | |
model = (model || '.disabled'), | |
inherit_id = NULL | |
WHERE id = %(child_id)s | |
""" | |
) | |
cr.execute(disable_view_query, locals()) | |
else: | |
raise MigrationError( | |
"Deprecated custom view with ID %s needs migration, " | |
"as parent %s (ID %s) is going to be removed" | |
% (child_id, xml_id, view_id) | |
) | |
if not silent: | |
_logger.info("Dropping deprecated built-in view %s (ID %s).", xml_id, view_id) | |
remove_record(cr, xml_id) | |
@contextmanager | |
def edit_view(cr, xmlid=None, view_id=None, skip_if_noupdate=False): | |
"""Contextmanager that may yield etree arch of a view. | |
As it may not yield, you must use `skippable_cm`: | |
with migration.skippable_cm(), migration.edit_view(cr, 'xmlid') as arch: | |
arch.attrib['string'] = 'My Form' | |
:param xml_id=None: fully qualified xmlid of the view | |
:param view_id=None: id of the view | |
:param skip_if_noupdate=False: if True, will not yield if the view is | |
set to be non-updatable | |
Note that you can either provide an xml_id or view_id but not both. | |
""" | |
assert bool(xmlid) ^ bool(view_id), "You Must specify either xmlid or view_id" | |
noupdate = True | |
if xmlid: | |
if "." not in xmlid: | |
raise ValueError("Please use fully qualified name <module>.<name>") | |
module, _, name = xmlid.partition(".") | |
_query = sql.SQL( | |
""" | |
SELECT res_id, | |
noupdate | |
FROM ir_model_data | |
WHERE module = %(module)s | |
AND name = %(name)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
data = cr.fetchone() | |
if data: | |
view_id, noupdate = data | |
if view_id and not (skip_if_noupdate and noupdate): | |
_query = sql.SQL( | |
""" | |
SELECT arch_db | |
FROM ir_ui_view | |
WHERE id = %(view_id)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
[arch] = cr.fetchone() or [None] | |
if arch: | |
arch = lxml.etree.fromstring(arch) | |
yield arch | |
arch = lxml.etree.tostring(arch, encoding="unicode") | |
_query = sql.SQL( | |
""" | |
UPDATE ir_ui_view | |
SET arch_db = %(arch)s | |
WHERE id = %(view_id)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
def remove_menus(cr, menu_ids): | |
"""Remove ir.ui.menu records with the provided ids (and their children).""" | |
if not menu_ids: | |
return | |
menu_ids = tuple(menu_ids) | |
_query = sql.SQL( | |
""" | |
WITH RECURSIVE tree(id) AS | |
( SELECT id | |
FROM ir_ui_menu | |
WHERE id IN %(menu_ids)s | |
UNION SELECT m.id | |
FROM ir_ui_menu m | |
JOIN tree t ON (m.parent_id = t.id) ) | |
DELETE | |
FROM ir_ui_menu m USING tree t | |
WHERE m.id = t.id RETURNING m.id | |
""" | |
) | |
cr.execute(_query, locals()) | |
ids = tuple(x[0] for x in cr.fetchall()) | |
if ids: | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_model_data | |
WHERE model='ir.ui.menu' | |
AND res_id IN %(ids)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
# ---------- Database/Postgres Utilities ---------- | |
def dbuuid(cr): | |
"""Get the uuid of the current database. | |
In the case of a duplicated database, return the original uuid.""" | |
_query = sql.SQL( | |
""" | |
SELECT value | |
FROM ir_config_parameter | |
WHERE KEY IN ('database.uuid', | |
'origin.database.uuid') | |
ORDER BY KEY DESC LIMIT 1 | |
""" | |
) | |
cr.execute(_query, locals()) | |
return cr.fetchone()[0] | |
def has_enterprise(): | |
"""Check if the current installation has enterprise addons availables or not.""" | |
return bool( | |
get_module_path("web_enterprise", downloaded=False, display_warning=False) | |
) | |
def model_of_table(cr, table): | |
"""Return the model name for the provided table.""" | |
return { | |
# could also be ir.actions.act_window_close, but we have yet | |
# to encounter a case where we need it | |
"ir_actions": "ir.actions.actions", | |
"ir_act_url": "ir.actions.act_url", | |
"ir_act_window": "ir.actions.act_window", | |
"ir_act_window_view": "ir.actions.act_window.view", | |
"ir_act_client": "ir.actions.client", | |
"ir_act_report_xml": "ir.actions.report", | |
"ir_act_server": "ir.actions.server", | |
"ir_act_wizard": "ir.actions.wizard", | |
"ir_config_parameter": "ir.config_parameter", | |
}.get(table, table.replace("_", ".")) | |
def table_of_model(cr, model): | |
"""Return the table for the provided model name.""" | |
return { | |
"ir.actions.actions": "ir_actions", | |
"ir.actions.act_url": "ir_act_url", | |
"ir.actions.act_window": "ir_act_window", | |
"ir.actions.act_window_close": "ir_actions", | |
"ir.actions.act_window.view": "ir_act_window_view", | |
"ir.actions.client": "ir_act_client", | |
"ir.actions.report.xml": "ir_act_report_xml", | |
"ir.actions.report": "ir_act_report_xml", | |
"ir.actions.server": "ir_act_server", | |
"ir.actions.wizard": "ir_act_wizard", | |
}.get(model, model.replace(".", "_")) | |
def table_exists(cr, table): | |
"""Check if the specified table exists.""" | |
_query = sql.SQL( | |
""" | |
SELECT 1 | |
FROM information_schema.tables | |
WHERE table_name = %(table)s | |
AND table_type = 'BASE TABLE' | |
""" | |
) | |
cr.execute(_query, locals()) | |
return cr.fetchone() is not None | |
def column_exists(cr, table, column): | |
"""Check if the column exist on the specified table.""" | |
return column_type(cr, table, column) is not None | |
def column_type(cr, table, column): | |
"""Get the type of the column on the specified table.""" | |
_query = sql.SQL( | |
""" | |
SELECT udt_name | |
FROM information_schema.columns | |
WHERE table_name = %(table)s | |
AND column_name = %(column)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
r = cr.fetchone() | |
return r[0] if r else None | |
def create_column(cr, table, column, definition): | |
"""Create a column on the specified table. | |
:param str table: name of the table on which the column will be created | |
:param str column: name of the new column | |
:param str definition: SQL-style definition of the table type | |
e.g. `boolean` or `varchar(256)` | |
""" | |
curtype = column_type(cr, table, column) | |
if curtype: | |
# TODO compare with definition | |
pass | |
else: | |
_params = { | |
"table": sql.Identifier(table), | |
"column": sql.Identifier(column), | |
"definition": sql.SQL(definition), | |
} | |
_query = sql.SQL( | |
""" | |
ALTER TABLE {table} ADD COLUMN {column} {definition} | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
def remove_column(cr, table, column, cascade=False): | |
"""Remove a column. | |
:param str table: name of the table on which the column will be dropped | |
:param str column: name of the column to drop | |
:param bool cascade: if True, all records having a FKEY pointing to this column | |
will be cascade-deleted (default: False) | |
""" | |
if column_exists(cr, table, column): | |
drop_depending_views(cr, table, column) | |
_params = { | |
"table": sql.Identifier(table), | |
"column": sql.Identifier(column), | |
"drop_cascade": sql.SQL("CASCADE" if cascade else ""), | |
} | |
_query = sql.SQL( | |
""" | |
ALTER TABLE {table} DROP COLUMN {column} {drop_cascade} | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
def get_columns(cr, table, ignore=("id",), extra_prefixes=None): | |
""" | |
Get the list of column in a table, minus ignored ones. | |
Can also returns the list multiple times with different prefixes. | |
This can be used to duplicating records (INSERT SELECT from the same table) | |
:param table: table toinspect | |
:param ignore=('id'): tuple of column names to ignore | |
""" | |
select = sql.SQL("quote_ident(column_name)") | |
params = [] | |
if extra_prefixes: | |
select = sql.SQL(",").join( | |
[select] | |
+ [sql.SQL("concat(%s, '.', {select}").format(select)] * len(extra_prefixes) | |
) | |
params = list(extra_prefixes) | |
_params = {"select": select} | |
_query = sql.SQL( | |
""" | |
SELECT {select} | |
FROM information_schema.columns | |
WHERE table_name = %s | |
AND column_name NOT IN %s | |
""" | |
).format(**_params) | |
cr.execute(_query, params + [table, ignore]) # Params is a list of unnamed args | |
return list(zip(*cr.fetchall())) | |
def get_depending_views(cr, table, column): | |
"""Get the list of SQL views depending on the specified column.""" | |
# http://stackoverflow.com/a/11773226/75349 | |
_query = sql.SQL( | |
""" | |
SELECT DISTINCT quote_ident(dependee.relname) | |
FROM pg_depend | |
JOIN pg_rewrite ON pg_depend.objid = pg_rewrite.oid | |
JOIN pg_class AS dependee ON pg_rewrite.ev_class = dependee.oid | |
JOIN pg_class AS dependent ON pg_depend.refobjid = dependent.oid | |
JOIN pg_attribute ON pg_depend.refobjid = pg_attribute.attrelid | |
AND pg_depend.refobjsubid = pg_attribute.attnum | |
WHERE dependent.relname = %(table)s | |
AND pg_attribute.attnum > 0 | |
AND pg_attribute.attname = %(column)s | |
AND dependee.relkind='v' | |
""" | |
) | |
cr.execute(_query, locals()) | |
return map(itemgetter(0), cr.fetchall()) | |
def drop_depending_views(cr, table, column): | |
"""Drop views depending on a column. | |
This is usually used to ensure that modifying fields will not make | |
SQL views (auto=False models) crash after the upgrade, forcing them | |
to be regenerated during the uprade. | |
""" | |
for view in get_depending_views(cr, table, column): | |
_params = {"view": sql.Identifier(view)} | |
_query = sql.SQL( | |
""" | |
DROP VIEW IF EXISTS {} CASCADE | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
def get_fk(cr, table): | |
"""Get the list of foreign keys pointing to `table` | |
:rtype: list(tuple) | |
:return: [(foreign_table, foreign_column, constraint_name, on_delete_action)] | |
where on_delete_action if one of the following: | |
- a: no action | |
- r: restrict | |
- c: cascade | |
- n: set null | |
- d: set default | |
""" | |
_query = sql.SQL( | |
""" | |
SELECT quote_ident(cl1.relname) AS TABLE, | |
quote_ident(att1.attname) AS COLUMN, | |
quote_ident(con.conname) AS conname, | |
con.confdeltype | |
FROM pg_constraint AS con, | |
pg_class AS cl1, | |
pg_class AS cl2, | |
pg_attribute AS att1, | |
pg_attribute AS att2 | |
WHERE con.conrelid = cl1.oid | |
AND con.confrelid = cl2.oid | |
AND array_lower(con.conkey, 1) = 1 | |
AND con.conkey[1] = att1.attnum | |
AND att1.attrelid = cl1.oid | |
AND cl2.relname = %(table)s | |
AND att2.attname = 'id' | |
AND array_lower(con.confkey, 1) = 1 | |
AND con.confkey[1] = att2.attnum | |
AND att2.attrelid = cl2.oid | |
AND con.contype = 'f' | |
""" | |
) | |
cr.execute(_query, locals()) | |
return cr.fetchall() | |
def delete_unused(cr, table, xmlids, set_noupdate=True): | |
""" | |
Delete all records in the provided list not being referenced in any FKEY. | |
Note that the xmlids themselves are not removed. | |
:param table: target table for the deletion | |
:param xmlids: list of xmlids to be checked and potentially deleted | |
:param set_noupdate=True: if set, the noupdate field of all the provided | |
xmlids will be set to True | |
""" | |
sub = sql.SQL(" UNION ").join( | |
[ | |
sql.SQL( | |
""" | |
SELECT 1 | |
FROM "{}" x | |
WHERE x."{}" = t.id | |
""".format( | |
f[0], f[1] | |
) | |
) | |
for f in get_fk(cr, table) | |
] | |
) | |
idmap = {ref(cr, x): x for x in xmlids} | |
idmap.pop(None, None) | |
if not sub or not idmap: | |
return | |
idmap = list(idmap) | |
_params = {"table": sql.Identifier(table), "subquery": sub} | |
_query = sql.SQL( | |
""" | |
SELECT id | |
FROM {table} t | |
WHERE id = ANY(%(idmap)s) | |
AND NOT EXISTS({subquery}) | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
for (tid,) in cr.fetchall(): | |
remove_record(cr, idmap.pop(tid)) | |
if set_noupdate: | |
for xid in idmap.values(): | |
force_noupdate(cr, xid, True) | |
def get_index_on(cr, table, *columns): | |
"""Get the list of indexes on the provided column names. | |
:rtype: list(tuple) | |
:return:a [(index_name, unique, pk)] | |
""" | |
columns = sorted(columns) | |
_query = sql.SQL( | |
""" | |
SELECT name, | |
indisunique, | |
indisprimary | |
FROM | |
(SELECT quote_ident(i.relname) AS name, | |
x.indisunique, | |
x.indisprimary, | |
array_agg(a.attname::text | |
ORDER BY a.attname) AS attrs | |
FROM | |
(SELECT *, | |
unnest(indkey) AS unnest_indkey | |
FROM pg_index) x | |
JOIN pg_class c ON c.oid = x.indrelid | |
JOIN pg_class i ON i.oid = x.indexrelid | |
JOIN pg_attribute a ON (a.attrelid=c.oid | |
AND a.attnum=x.unnest_indkey) | |
WHERE (c.relkind = ANY (ARRAY['r'::"char", 'm'::"char"])) | |
AND i.relkind = 'i'::"char" | |
AND c.relname = %(table)s | |
GROUP BY 1, | |
2, | |
3 ) idx | |
WHERE attrs = %(columns)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
return cr.fetchone() | |
def pg_array_uniq(a, drop_null=False): | |
"""???""" | |
dn = "WHERE x IS NOT NULL" if drop_null else "" | |
return "ARRAY(SELECT x FROM unnest({}) x {} GROUP BY x)".format(a, dn) | |
def pg_html_escape(s, quote=True): | |
"""SQL version of html.escape""" | |
replacements = [("&", "&"), ("<", "<"), (">", ">")] # Must be done first! | |
if quote: | |
replacements += [('"', """), ("'", "'")] | |
def q(s): | |
return ( | |
psycopg2.extensions.QuotedString(s).getquoted().decode("utf-8") | |
) # noqa: E704 | |
return reduce( | |
lambda s, r: "replace({}, {}, {})".format(s, q(r[0]), q(r[1])), replacements, s | |
) | |
def pg_text2html(s): | |
return r"CONCAT('<p>', replace({}, E'\n', '<br>'), '</p>')".format( | |
pg_html_escape(s) | |
) | |
def view_exists(cr, view): | |
"""Check if the specified SQL view exists.""" | |
_query = sql.SQL( | |
""" | |
SELECT 1 | |
FROM information_schema.views | |
WHERE TABLE_NAME=%(view)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
return bool(cr.rowcount) | |
# ----------------- Utils ----------------- | |
def remove_record(cr, name, deactivate=False, active_field="active"): | |
""" | |
Remove a record from the database by xmlid. | |
:param str name: fully qualified xmlid (<module>.<name>) | |
:param boolean deactivate: if True, the record may be archived if | |
deletion is impossible (eg FKEY constraint) | |
:param str active_field: name of the field to use for deactivation, | |
'active' by default | |
""" | |
if "." not in name: | |
raise ValueError("Please use fully qualified name <module>.<name>") | |
module, name = name.split(".") | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM ir_model_data | |
WHERE module = %(module)s | |
AND name = %(name)s RETURNING model, res_id | |
""" | |
) | |
cr.execute(_query, locals()) | |
data = cr.fetchone() | |
if not data: | |
return | |
model, res_id = data | |
table = table_of_model(cr, model) | |
try: | |
with savepoint(cr): | |
cr.execute('DELETE FROM "%s" WHERE id=%%s' % table, (res_id,)) | |
except Exception: | |
if not deactivate or not active_field: | |
raise | |
_params = { | |
"table": sql.Identifier(table), | |
"active_field": sql.Identifier(active_field), | |
} | |
_query = sql.SQL( | |
""" | |
UPDATE {table} | |
SET {active_field} = FALSE | |
WHERE id = %(res_id)s | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
else: | |
# delete all indirect references to the record (e.g. mail_message entries, etc.) | |
for ir in indirect_references(cr, bound_only=True): | |
_params = { | |
"table": sql.Identifier(ir.table), | |
"where_clause": sql.SQL(ir.model_filter(placeholder="%(model)s")), | |
"res_id": sql.Identifier(ir.res_id), | |
} | |
_query = sql.SQL( | |
""" | |
DELETE | |
FROM {table} | |
WHERE {where_clause} | |
AND {res_id} = %(res_id)s | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
def splitlines(s): | |
"""Yield stripped lines of `s`. | |
Skip empty lines & remove comments (starts with `#`). | |
""" | |
return (sl for l in s.splitlines() for sl in [l.split("#", 1)[0].strip()] if sl) | |
def expand_braces(s): | |
"""Expand braces (a la bash). | |
Only handle one expension of a 2 parts (because we don't need more). | |
""" | |
r = re.compile(r"(.*){([^},]*?,[^},]*?)}(.*)") | |
m = r.search(s) | |
if not m: | |
raise ValueError("No braces to expand") | |
head, match, tail = m.groups() | |
a, b = match.split(",") | |
return [head + a + tail, head + b + tail] | |
class IndirectReference( | |
namedtuple("IndirectReference", "table res_model res_id res_model_id") | |
): | |
def model_filter(self, prefix="", placeholder="%s"): | |
if prefix and prefix[-1] != ".": | |
prefix += "." | |
if self.res_model_id: | |
placeholder = "(SELECT id FROM ir_model WHERE model={})".format(placeholder) | |
column = self.res_model_id | |
else: | |
column = self.res_model | |
return '{}"{}"={}'.format(prefix, column, placeholder) | |
# allow the class to handle defaults implicitely | |
IndirectReference.__new__.__defaults__ = ( | |
None, | |
None, | |
) # https://stackoverflow.com/a/18348004 | |
def indirect_references(cr, bound_only=False): | |
IR = IndirectReference | |
each = [ | |
IR("ir_attachment", "res_model", "res_id"), | |
IR("ir_cron", "model", None), | |
IR("ir_act_report_xml", "model", None), | |
IR("ir_act_window", "res_model", "res_id"), | |
IR("ir_act_window", "src_model", None), | |
IR("ir_act_server", "wkf_model_name", None), | |
IR("ir_act_server", "crud_model_name", None), | |
IR("ir_act_client", "res_model", None), | |
IR("ir_model", "model", None), | |
IR("ir_model_fields", "model", None), | |
# destination of a relation field | |
IR("ir_model_fields", "relation", None), | |
IR("ir_model_data", "model", "res_id"), | |
IR("ir_filters", "model_id", None), # YUCK!, not an id | |
IR("ir_exports", "resource", None), | |
IR("ir_ui_view", "model", None), | |
IR("ir_values", "model", "res_id"), | |
IR("wkf_transition", "trigger_model", None), | |
IR("wkf_triggers", "model", None), | |
IR("ir_model_fields_anonymization", "model_name", None), | |
IR("ir_model_fields_anonymization_migration_fix", "model_name", None), | |
IR("base_import_import", "res_model", None), | |
IR("calendar_event", "res_model", "res_id"), # new in saas~18 | |
IR("mail_template", "model", None), | |
IR("mail_activity", "res_model", "res_id", "res_model_id"), | |
IR("mail_alias", None, "alias_force_thread_id", "alias_model_id"), | |
IR("mail_alias", None, "alias_parent_thread_id", "alias_parent_model_id"), | |
IR("mail_followers", "res_model", "res_id"), | |
IR("mail_message_subtype", "res_model", None), | |
IR("mail_message", "model", "res_id"), | |
IR("mail_compose_message", "model", "res_id"), | |
IR("mail_wizard_invite", "res_model", "res_id"), | |
IR("mail_mail_statistics", "model", "res_id"), | |
IR("mail_mass_mailing", "mailing_model", None), | |
IR("project_project", "alias_model", None), | |
IR("rating_rating", "res_model", "res_id", "res_model_id"), | |
IR("rating_rating", "parent_res_model", "parent_res_id", "parent_res_model_id"), | |
] | |
for ir in each: | |
if bound_only and not ir.res_id: | |
continue | |
if ir.res_id and not column_exists(cr, ir.table, ir.res_id): | |
continue | |
# some `res_model/res_model_id` combination may change between | |
# versions (i.e. rating_rating.res_model_id was added in saas~15). | |
# we need to verify existance of columns before using them. | |
if ir.res_model and not column_exists(cr, ir.table, ir.res_model): | |
ir = ir._replace(res_model=None) | |
if ir.res_model_id and not column_exists(cr, ir.table, ir.res_model_id): | |
ir = ir._replace(res_model_id=None) | |
if not ir.res_model and not ir.res_model_id: | |
continue | |
yield ir | |
def res_model_res_id(cr): | |
"""Iterate on base models having a field that references records by model/id. | |
Allow iterating on all models that reference records by using a | |
res_model/res_id field combiination of similar reference system; | |
usually used to quickly iterate over basic ORM models (views, crons, etc.) | |
that references other models in an indirect way (without FKEY). | |
:rtype tuple: | |
:return: tuple containing the model, model reference field name | |
and id reference field name | |
""" | |
for ir in indirect_references(cr): | |
if ir.res_model: | |
yield model_of_table(cr, ir.table), ir.res_model, ir.res_id | |
@contextmanager | |
def skippable_cm(): | |
"""Allow a contextmanager to not yield.""" | |
if not hasattr(skippable_cm, "_msg"): | |
@contextmanager | |
def _(): | |
if 0: | |
yield | |
try: | |
with _(): | |
pass | |
except RuntimeError as r: | |
skippable_cm._msg = str(r) | |
try: | |
yield | |
except RuntimeError as r: | |
if str(r) != skippable_cm._msg: | |
raise | |
@contextmanager | |
def savepoint(cr): | |
"""Provide a savepoint context. | |
If a query executed in this context fails, the operation is rollbacked, | |
otherwise it success silently. | |
""" | |
name = hex(int(time.time() * 1000))[1:] | |
cr.execute("SAVEPOINT {}".format(name)) | |
try: | |
yield | |
cr.execute("RELEASE SAVEPOINT {}".format(name)) | |
except Exception: | |
cr.execute("ROLLBACK TO SAVEPOINT {}".format(name)) | |
raise | |
def log_progress(it, qualifier="elements", logger=_logger, size=None): | |
if size is None: | |
size = len(it) | |
size = float(size) | |
t0 = t1 = datetime.datetime.now() | |
for i, e in enumerate(it, 1): | |
yield e | |
t2 = datetime.datetime.now() | |
if (t2 - t1).total_seconds() > 60: | |
t1 = datetime.datetime.now() | |
tdiff = t2 - t0 | |
logger.info( | |
"[%.02f%%] %d/%d %s processed in %s (TOTAL estimated time: %s)", | |
(i / size * 100.0), | |
i, | |
size, | |
qualifier, | |
tdiff, | |
datetime.timedelta(seconds=tdiff.total_seconds() * size / i), | |
) | |
def env(cr): | |
"""Get an environment for the SUPERUSER ('admin').""" | |
from odoo.api import Environment | |
return Environment(cr, SUPERUSER_ID, {}) | |
def import_script(path): | |
"""Import a script from another module. | |
This can be used if some changes have been applied across the whole | |
codebase but would need module-specific changes; e.g. a generic script | |
can be defined in the `base` module and called by any relevant module | |
who might want to use it to adapt something depending on values depending | |
from that module. | |
:param path: relative path to the script | |
<module>/<versions>/<script> | |
e.g. base/12.0.1.3/do_stuff.py | |
""" | |
name, _ = os.path.splitext(os.path.basename(path)) | |
full_path = os.path.join(os.path.dirname(__file__), path) | |
with open(full_path) as fp: | |
return imp.load_source(name, full_path, fp) | |
def dispatch_by_dbuuid(cr, version, callbacks): | |
"""Apply dbuuid-specific migrations. | |
Allow defining custom migration functions that can be applied on a | |
single database identified by its uuid, e.g.: | |
_db_callback(cr, version): | |
# do stuff specific to that db | |
def migrate(cr, version): | |
migration.dispatch_by_dbuuid(cr, version, { | |
'88ef269b-f6de-4f76-b9c8-868fc5569136': _db_callback, | |
}) | |
The callback function _db_callback should have the same signature as a | |
`migrate` function, taking the cursor `cr` and `version` as args. | |
:param version: target version for this upgrade | |
:param callbacks: dict where each key is a uuid and value is a | |
reference to the callback function | |
""" | |
uuid = dbuuid(cr) | |
if uuid in callbacks: | |
func = callbacks[uuid] | |
_logger.info("calling dbuuid-specific function `%s`", func.__name__) | |
func(cr, version) | |
def register_unanonymization_query( | |
cr, model, field, query, query_type="sql", sequence=10 | |
): | |
""" | |
Generate an unanonymization query. | |
Allow newly created fields, values, etc. to be deanonymzed even though they | |
were not present during anonymization (e.g. when values are moved/copied). | |
:param model: | |
:param field: | |
:param query: | |
:param query_type='sql': | |
:param sequence=10: | |
""" | |
target_version = release.major_version | |
_query = sql.SQL( | |
""" | |
INSERT INTO ir_model_fields_anonymization_migration_fix( target_version, sequence, query_type, model_name, field_name, query ) | |
VALUES (%(target_version)s, | |
%(sequence)s, | |
%(query_type)s, | |
%(model)s, | |
%(field)s, | |
%(query)s) | |
""" | |
) | |
cr.execute(_query, locals()) | |
def _rst2html(rst): | |
"""Convert rst to html.""" | |
overrides = dict( | |
embed_stylesheet=False, | |
doctitle_xform=False, | |
output_encoding="unicode", | |
xml_declaration=False, | |
) | |
html = publish_string( | |
source=dedent(rst), settings_overrides=overrides, writer=MyWriter() | |
) | |
return html_sanitize(html, silent=False) | |
def _md2html(md): | |
"""Convert markdown to html.""" | |
extensions = [ | |
"markdown.extensions.smart_strong", | |
"markdown.extensions.nl2br", | |
"markdown.extensions.sane_lists", | |
] | |
return markdown.markdown(md, extensions=extensions) | |
# ---------- xmlid utilities ---------- | |
def ref(cr, xmlid): | |
"""Get the id of an xmlid entry.""" | |
if "." not in xmlid: | |
raise ValueError("Please use fully qualified name <module>.<name>") | |
module, name = xmlid.split(".") | |
_query = sql.SQL( | |
""" | |
SELECT res_id | |
FROM ir_model_data | |
WHERE module = %(module)s | |
AND name = %(name)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
data = cr.fetchone() | |
if data: | |
return data[0] | |
return None | |
def rename_xmlid(cr, old, new, noupdate=None): | |
"""Rename an xmlid. | |
In the case of a view xmlid, the key if the view is updated as well. | |
""" | |
if "." not in old or "." not in new: | |
raise ValueError("Please use fully qualified name <module>.<name>") | |
old_module, old_name = old.split(".") | |
new_module, new_name = new.split(".") | |
noupdate = sql.SQL( | |
"" if noupdate is None else (", noupdate=" + str(bool(noupdate)).lower()) | |
) | |
_params = {"noupdate": noupdate} | |
_query = sql.SQL( | |
""" | |
UPDATE ir_model_data | |
SET module = %(new_module)s, | |
name=%(new_name)s {noupdate} | |
WHERE module = %(old_module)s | |
AND name = %(old_name)s RETURNING model, | |
res_id | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
data = cr.fetchone() | |
if data: | |
model, rid = data | |
if model == "ir.ui.view": | |
_update_view_key(cr, old, new) | |
_query = sql.SQL( | |
""" | |
UPDATE ir_ui_view | |
SET key = %(new)s | |
WHERE id = %(rid)s | |
AND key = %(old)s | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
return rid | |
return None | |
def force_noupdate(cr, xmlid, noupdate=True, warn=False): | |
"""Force the noupdate value of an xmlid.""" | |
if "." not in xmlid: | |
raise ValueError("Please use fully qualified name <module>.<name>") | |
module, name = xmlid.split(".") | |
_query = sql.SQL( | |
""" | |
UPDATE ir_model_data | |
SET noupdate = %(noupdate)s | |
WHERE module = %(module)s | |
AND name = %(name)s | |
AND noupdate != %(noupdate)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
if noupdate is False and cr.rowcount and warn: | |
_logger.warning("Customizations on `%s` might be lost!", xmlid) | |
return cr.rowcount | |
def ensure_xmlid_match_record(cr, xmlid, model, values): | |
"""Ensure the provided values matches the provided xmlid. | |
Check the provided model table for the presence of a record matching | |
the given xmlid. If the record is not found, searches the table for | |
any record matching the provided values and associate it to the xmlid. | |
:param str xmlid: fully qualified xmlid (<module>.<name>) | |
:param str model: name of the model to check (Odoo model name) | |
:param tuple values: column name and value to check if the record needs | |
to be found in the table (will be used in a WHERE | |
query) | |
:rtype: integer | |
:return: ID of the record | |
""" | |
if "." not in xmlid: | |
raise ValueError("Please use fully qualified name <module>.<name>") | |
module, name = xmlid.split(".") | |
_query = sql.SQL( | |
""" | |
SELECT id, | |
res_id | |
FROM ir_model_data | |
WHERE module = %(module)s | |
AND name = %(name)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
table = table_of_model(cr, model) | |
data = cr.fetchone() | |
if data: | |
data_id, res_id = data | |
# check that record still exists | |
_params = {"table": sql.Identifier(table)} | |
_query = sql.SQL( | |
""" | |
SELECT id | |
FROM {table} | |
WHERE id = %(res_id)s | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
if cr.fetchone(): | |
return res_id | |
else: | |
data_id = None | |
# search for existing record marching values | |
where = sql.Composed([]) | |
data = () | |
for field, value in values.items(): | |
_params = {"field": sql.Identifier(field)} | |
if value: | |
_params["value"] = sql.Literal(value) | |
where += sql.Composed([sql.SQL("{field} = {value}").format(**_params)]) | |
data += (value,) | |
else: | |
where += sql.Composed([sql.SQL("{field} IS NULL").format(**_params)]) | |
data += () | |
_params = {"table": sql.Identifier(table), "where": where.join(" AND ")} | |
_query = sql.SQL( | |
""" | |
SELECT id | |
FROM {table} | |
WHERE {where} | |
""" | |
).format(**_params) | |
cr.execute(_query, locals()) | |
record = cr.fetchone() | |
if not record: | |
return None | |
res_id = record[0] | |
# update xmlid table | |
if data_id: | |
_query = sql.SQL( | |
""" | |
UPDATE ir_model_data | |
SET res_id = %(res_id)s | |
WHERE id = %(data_id)s | |
""" | |
) | |
cr.execute(_query, locals()) | |
else: | |
_query = sql.SQL( | |
""" | |
INSERT INTO ir_model_data (module, name, model, res_id, noupdate) | |
VALUES (%(module)s, | |
%(name)s, | |
%(model)s, | |
%(res_id)s, | |
TRUE) | |
""" | |
) | |
cr.execute(_query, locals()) | |
return res_id | |
# -------- Announcement Message -------- | |
_DEFAULT_HEADER = """ | |
<p>{module} has been upgraded to version {version}.</p> | |
<h2>What's new in this upgrade?</h2> | |
""" | |
_DEFAULT_FOOTER = "<p>Enjoy this new version of {module}!</p>" | |
_DEFAULT_RECIPIENT = "mail.channel_all_employees" | |
def announce( | |
cr, | |
module, | |
version, | |
msg, | |
format="rst", | |
recipient=_DEFAULT_RECIPIENT, | |
header=_DEFAULT_HEADER, | |
footer=_DEFAULT_FOOTER, | |
pluses_for_enterprise=None, | |
): | |
""" | |
Post an upgrade message in the selected channel detailing the upgrade. | |
:param module: nmae of the upgraded module | |
:param version: target upgrade version | |
:param msg: message regarding the upgrade | |
:param format='rst': format of the message | |
('rst' for ReStructured Text or 'md' for markdown) | |
:param recipient: xmlid of the channel where the message will be posted | |
:param header: header of the message (set False for no header) | |
:param footer: footer of the message (set False for no footer) | |
:param pluses_for_enterprise=None: if True, list elements in your message | |
prefixed with a '+ ' string will be | |
filtered out if the upgraded database | |
does not have the Enterprise edition | |
""" | |
if pluses_for_enterprise: | |
plus_re = r"^(\s*)\+ (.+)\n" | |
replacement = r"\1- \2\n" if has_enterprise() else "" | |
msg = re.sub(plus_re, replacement, msg, flags=re.M) | |
# do not notify early, in case the migration fails halfway through | |
ctx = {"mail_notify_force_send": False, "mail_notify_author": True} | |
try: | |
registry = env(cr) | |
user = registry["res.users"].browse([SUPERUSER_ID])[0].with_context(ctx) | |
def ref(xid): | |
return registry.ref(xid).with_context(ctx) | |
except MigrationError: | |
registry = Registry.get(cr.dbname) | |
user = registry["res.users"].browse(cr, SUPERUSER_ID, SUPERUSER_ID, context=ctx) | |
def ref(xid): | |
rmod, _, rxid = recipient.partition(".") | |
return registry["ir.model.data"].get_object( | |
cr, SUPERUSER_ID, rmod, rxid, context=ctx | |
) | |
# default recipient | |
poster = user.message_post | |
if recipient: | |
try: | |
poster = ref(recipient).message_post | |
except (ValueError, AttributeError): | |
# Cannot find record, post the message on the wall of the admin | |
pass | |
if format == "rst": | |
msg = _rst2html(msg) | |
elif format == "md": | |
msg = _md2html(msg) | |
message = ((header or "") + msg + (footer or "")).format( | |
module=module or "Odoo", version=version | |
) | |
_logger.debug(message) | |
type_field = "message_type" | |
kw = {type_field: "notification"} | |
try: | |
poster( | |
body=message, | |
partner_ids=[user.partner_id.id], | |
subtype="mail.mt_comment", | |
**kw | |
) | |
except Exception: | |
_logger.warning("Cannot announce message", exc_info=True) | |
# --- NOT SURE IF STILL NEEDED ??? --- # | |
def main(func, version=None): | |
"""a main() function for scripts""" | |
# NOTE: this is not recommanded when the func callback use the ORM as the addon-path is | |
# incomplete. Please pipe your script into `odoo shell`. | |
# Do not forget to commit the cursor at the end. | |
if len(sys.argv) != 2: | |
sys.exit("Usage: {} <dbname>".format(sys.argv[0])) | |
dbname = sys.argv[1] | |
with db_connect(dbname).cursor() as cr, Environment.manage(): | |
func(cr, version) | |
def _ir_values_value(cr): | |
# returns the casting from bytea to text needed in saas~17 for column `value` of `ir_values` | |
# returns tuple(column_read, cast_write) | |
result = getattr(_ir_values_value, "result", None) | |
if result is None: | |
if column_type(cr, "ir_values", "value") == "bytea": | |
cr.execute( | |
"SELECT character_set_name FROM information_schema.character_sets" | |
) | |
charset, = cr.fetchone() | |
column_read = "convert_from(value, '%s')" % charset | |
cast_write = "convert_to(%%s, '%s')" % charset | |
else: | |
column_read = "value" | |
cast_write = "%s" | |
_ir_values_value.result = result = (column_read, cast_write) | |
return result | |
def chunks(iterable, size, fmt=None): | |
""" | |
Split `iterable` into chunks of `size` and wrap each chunk | |
using function 'fmt' (`iter` by default; join strings) | |
>>> list(chunks(range(10), 4, fmt=tuple)) | |
[(0, 1, 2, 3), (4, 5, 6, 7), (8, 9)] | |
>>> ' '.join(chunks('abcdefghijklm', 3)) | |
'abc def ghi jkl m' | |
>>> | |
""" | |
if fmt is None: | |
fmt = "".join | |
it = iter(iterable) | |
try: | |
while True: | |
yield fmt(chain((next(it),), islice(it, size - 1))) | |
except StopIteration: | |
return | |
def iter_browse(model, *args, **kw): | |
""" | |
Iterate and browse through record without filling the cache. | |
`args` can be `cr, uid, ids` or just `ids` depending on kind of `model` (old/new api) | |
""" | |
assert len(args) in [1, 3] # either (cr, uid, ids) or (ids,) | |
cr_uid = args[:-1] | |
ids = args[-1] | |
chunk_size = kw.pop("chunk_size", 200) # keyword-only argument | |
logger = kw.pop("logger", _logger) | |
if kw: | |
raise TypeError("Unknow arguments: %s" % ", ".join(kw)) | |
def browse(ids): | |
model.invalidate_cache(*cr_uid) | |
args = cr_uid + (list(ids),) | |
return model.browse(*args) | |
def end(): | |
model.invalidate_cache(*cr_uid) | |
if 0: | |
yield | |
it = chain.from_iterable(chunks(ids, chunk_size, fmt=browse)) | |
if logger: | |
it = log_progress(it, qualifier=model._name, logger=logger, size=len(ids)) | |
return chain(it, end()) |
IIRC, this has been on a public branch in the odoo
or odoo-dev
public repos around the time of the creation of this gist.
So I'd assume the license would follow odoo's.
Hi David,
Thank you for this script !
Calling your def remove_module in my pre-migrate.py, i got an ERROR from your file (in : def remove_field) during the migration process:
odoo.sql_db:
bad query: SQL('\n\n SELECT array_agg(f.name),\n array_agg(aw.id)\n FROM ir_model_fields f\n JOIN ir_act_window aw ON aw.res_model = f.model\n WHERE f.model = %s\n AND f.name = %s\n GROUP BY f.model\n\n ')
ERROR: dict is not a sequence
################ Odoo.sh Upgrade LOG ################
023-01-28 15:25:03,339 9 INFO oerp-brstaging-7083812 odoo.modules.migration: module website_ticket_event: Running migration [>15.0.0.0.0] pre-migrate
2023-01-28 15:25:03,569 9 ERROR oerp-brstaging-7083812 odoo.sql_db: bad query: SQL('\n\n SELECT array_agg(f.name),\n array_agg(aw.id)\n FROM ir_model_fields f\n JOIN ir_act_window aw ON aw.res_model = f.model\n WHERE f.model = %s\n AND f.name = %s\n GROUP BY f.model\n\n ')
ERROR: dict is not a sequence
2023-01-28 15:25:03,573 9 WARNING oerp-brstaging-7083812 odoo.modules.loading: Transient module states were reset
2023-01-28 15:25:03,576 9 ERROR oerp-brstaging-7083812 odoo.modules.registry: Failed to load registry
2023-01-28 15:25:03,577 9 CRITICAL oerp-brstaging-7083812 odoo.service.server: Failed to initialize database oerp-brstaging-7083812
.
Traceback (most recent call last):
File "/home/odoo/src/odoo/odoo/service/server.py", line 1260, in preload_registries
registry = Registry.new(dbname, update_module=update_module)
File "/home/odoo/src/odoo/odoo/modules/registry.py", line 87, in new
odoo.modules.load_modules(registry, force_demo, status, update_module)
File "/home/odoo/src/odoo/odoo/modules/loading.py", line 470, in load_modules
processed_modules += load_marked_modules(cr, graph,
File "/home/odoo/src/odoo/odoo/modules/loading.py", line 363, in load_marked_modules
loaded, processed = load_module_graph(
File "/home/odoo/src/odoo/odoo/modules/loading.py", line 174, in load_module_graph
migrations.migrate_module(package, 'pre')
File "/home/odoo/src/odoo/odoo/modules/migration.py", line 180, in migrate_module
migrate(self.cr, installed_version)
File "/home/odoo/src/user/website_ticket_event/migrations/15.0.0.0.0/pre-migrate.py", line 56, in migrate
remove_module(cr, "website_google_analytics_4")
File "/home/odoo/src/user/website_ticket_event/migrations/migration.py", line 283, in remove_module
remove_field(cr, model, name)
File "/home/odoo/src/user/website_ticket_event/migrations/migration.py", line 903, in remove_field
cr.execute(_query, locals())
File "", line 2, in execute
File "/home/odoo/src/odoo/odoo/sql_db.py", line 90, in check
return f(self, *args, **kwargs)
File "/home/odoo/src/odoo/odoo/sql_db.py", line 313, in execute
res = self._obj.execute(query, params)
TypeError: dict is not a sequence
2023-01-28 15:25:03,578 9 INFO oerp-brstaging-7083812 odoo.service.server: Initiating shutdown
################################################
Any idea how to fix it ?
Best regards,
Sylvain
Hi David!
I wonder, what is license for these tools?