Skip to content

Instantly share code, notes, and snippets.

@faniska
Created January 20, 2025 14:35
Show Gist options
  • Save faniska/f1d3dfb50dc16ffce4beaf1ff1bb21ac to your computer and use it in GitHub Desktop.
Save faniska/f1d3dfb50dc16ffce4beaf1ff1bb21ac to your computer and use it in GitHub Desktop.
Odoo record merge mixin
from odoo import models
class RecordMergeMixin(models.AbstractModel):
_name = 'record.merge.mixin'
_description = 'Record Merge Mixin Abstract'
def get_referencing_fields(self, exclude_models=None):
domain = [
('relation', '=', self._name),
('related', '=', False),
('store', '=', True),
('ttype', 'in', ('many2one', 'many2many')),
]
if exclude_models:
domain.append(('model_id.model', 'not in', exclude_models))
return self.env['ir.model.fields'].search(domain)
def get_related_objects(self, dest_id):
referencing_fields = self.get_referencing_fields()
res_ids = list(set(self.ids) - {dest_id})
results = {'many2one': {}, 'many2many': {}}
if res_ids:
for ref_field in referencing_fields:
model_name = ref_field.model_id.model
field_name = ref_field.name
model = self.env[model_name]
if model._abstract or model._transient:
# print(f'Skip: {model_name}')
continue
related_objects = model.search([(field_name, 'in', res_ids)])
if not related_objects:
continue
if model_name not in results[ref_field.ttype]:
results[ref_field.ttype][model_name] = {}
if field_name not in results[ref_field.ttype][model_name]:
results[ref_field.ttype][model_name][field_name] = related_objects
return results
def merge(self, dest_id):
# print(f'merge: {records.ids} => {dest_id}')
related_objects = self.get_related_objects(dest_id)
for field_type, model_data in related_objects.items():
for model_name, related_records in model_data.items():
for field_name, records in related_records.items():
merge_method = 'sql'
if field_type == 'many2many':
merge_method = 'orm'
self.update_foreign_keys(merge_method, field_name, field_type, records, dest_id)
def update_foreign_keys(self, method, field_name, field_type, records, dest_id):
# print(f'update_foreign_keys: {field_name}, len {len(records)}')
if field_type == 'many2one':
if method == 'sql':
params = {
'table': records._table,
'field': field_name,
'id': dest_id,
'ids': ','.join(map(str, records.ids))
}
query = "UPDATE {table} SET {field} = {id} WHERE id IN ({ids})".format(**params)
self.env.cr.execute(query)
elif method == 'orm':
records.write({field_name: dest_id})
if field_type == 'many2many':
if method == 'sql':
raise NotImplementedError
elif method == 'orm':
# 3 removes the record of id from the set
# 4 adds an existing record of id to the set
m2m_set = [(3 if res_id != dest_id else 4, res_id) for res_id in records.ids]
records.write({field_name: m2m_set})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment