Example:
import collections.abc
current_lake_configuration = {
'DataLakeAdmins': [
{
'DataLakePrincipalIdentifier': 'Test1'
},
],
'CreateDatabaseDefaultPermissions': [
{
'Principal': {
'DataLakePrincipalIdentifier': 'id1'
},
'Permissions': [
'ALL1',
]
},
],
}
new_lake_configuration = {
'DataLakeAdmins': [
{
'DataLakePrincipalIdentifier': 'Test2'
},
],
'CreateDatabaseDefaultPermissions': [
{
'Principal': {
'DataLakePrincipalIdentifier': 'id2'
},
'Permissions': [
'ALL12', 'ALL3'
]
},
{
'Principal': {
'DataLakePrincipalIdentifier': 'id1'
},
'Permissions': [
'ALL1_UPDATE',
]
},
],
}
def dict_merge(*args, add_keys=True):
assert len(args) >= 2, "dict_merge requires at least two dicts to merge"
rtn_dct = args[0].copy()
merge_dicts = args[1:]
for merge_dct in merge_dicts:
if add_keys is False:
merge_dct = {key: merge_dct[key] for key in set(rtn_dct).intersection(set(merge_dct))}
for k, v in merge_dct.items():
if not rtn_dct.get(k):
rtn_dct[k] = v
elif k in rtn_dct and type(v) != type(rtn_dct[k]):
raise TypeError(f"Overlapping keys exist with different types: original is {type(rtn_dct[k])}, new value is {type(v)}")
elif isinstance(rtn_dct[k], dict) and isinstance(merge_dct[k], collections.abc.Mapping):
rtn_dct[k] = dict_merge(rtn_dct[k], merge_dct[k], add_keys=add_keys)
elif isinstance(v, list):
for list_value in v:
if list_value not in rtn_dct[k]:
rtn_dct[k].append(list_value)
else:
rtn_dct[k] = v
return rtn_dct
dict_merge(current_lake_configuration, new_lake_configuration)
Results:
{
"DataLakeAdmins": [
{"DataLakePrincipalIdentifier": "TEST_1"},
{"DataLakePrincipalIdentifier": "TEST_2"},
],
"CreateDatabaseDefaultPermissions": [
{
"Principal": {"DataLakePrincipalIdentifier": "ID_1"},
"Permissions": ["ALL_1"],
},
{
"Principal": {"DataLakePrincipalIdentifier": "ID_2"},
"Permissions": ["ALL_2"],
},
],
}