Created
January 20, 2025 14:35
-
-
Save faniska/f1d3dfb50dc16ffce4beaf1ff1bb21ac to your computer and use it in GitHub Desktop.
Odoo record merge mixin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from odoo import models | |
class RecordMergeMixin(models.AbstractModel): | |
_name = 'record.merge.mixin' | |
_description = 'Record Merge Mixin Abstract' | |
def get_referencing_fields(self, exclude_models=None): | |
domain = [ | |
('relation', '=', self._name), | |
('related', '=', False), | |
('store', '=', True), | |
('ttype', 'in', ('many2one', 'many2many')), | |
] | |
if exclude_models: | |
domain.append(('model_id.model', 'not in', exclude_models)) | |
return self.env['ir.model.fields'].search(domain) | |
def get_related_objects(self, dest_id): | |
referencing_fields = self.get_referencing_fields() | |
res_ids = list(set(self.ids) - {dest_id}) | |
results = {'many2one': {}, 'many2many': {}} | |
if res_ids: | |
for ref_field in referencing_fields: | |
model_name = ref_field.model_id.model | |
field_name = ref_field.name | |
model = self.env[model_name] | |
if model._abstract or model._transient: | |
# print(f'Skip: {model_name}') | |
continue | |
related_objects = model.search([(field_name, 'in', res_ids)]) | |
if not related_objects: | |
continue | |
if model_name not in results[ref_field.ttype]: | |
results[ref_field.ttype][model_name] = {} | |
if field_name not in results[ref_field.ttype][model_name]: | |
results[ref_field.ttype][model_name][field_name] = related_objects | |
return results | |
def merge(self, dest_id): | |
# print(f'merge: {records.ids} => {dest_id}') | |
related_objects = self.get_related_objects(dest_id) | |
for field_type, model_data in related_objects.items(): | |
for model_name, related_records in model_data.items(): | |
for field_name, records in related_records.items(): | |
merge_method = 'sql' | |
if field_type == 'many2many': | |
merge_method = 'orm' | |
self.update_foreign_keys(merge_method, field_name, field_type, records, dest_id) | |
def update_foreign_keys(self, method, field_name, field_type, records, dest_id): | |
# print(f'update_foreign_keys: {field_name}, len {len(records)}') | |
if field_type == 'many2one': | |
if method == 'sql': | |
params = { | |
'table': records._table, | |
'field': field_name, | |
'id': dest_id, | |
'ids': ','.join(map(str, records.ids)) | |
} | |
query = "UPDATE {table} SET {field} = {id} WHERE id IN ({ids})".format(**params) | |
self.env.cr.execute(query) | |
elif method == 'orm': | |
records.write({field_name: dest_id}) | |
if field_type == 'many2many': | |
if method == 'sql': | |
raise NotImplementedError | |
elif method == 'orm': | |
# 3 removes the record of id from the set | |
# 4 adds an existing record of id to the set | |
m2m_set = [(3 if res_id != dest_id else 4, res_id) for res_id in records.ids] | |
records.write({field_name: m2m_set}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment