|
from datetime import datetime |
|
from typing import Any, Dict, List |
|
|
|
def update_related_documents( |
|
match_field: str, |
|
match_value: Any, |
|
new_data: Dict, |
|
fields_to_update: List[str], |
|
target_index: str, |
|
): |
|
""" |
|
Update documents in the specified target_index that reference a particular value |
|
in a given match_field, using dynamic field filtering. |
|
|
|
Parameters: |
|
match_field: The field used to identify related documents (e.g. "username"). |
|
match_value: The value to match for the specified field. |
|
new_data: A dictionary containing the new data to update the documents with. |
|
fields_to_update: A list of fields to update in the target documents. |
|
target_index: The name of the index containing the documents to update. |
|
|
|
Returns: |
|
The number of documents updated. |
|
""" |
|
try: |
|
# Check if the target index exists |
|
if not es.indices.exists(index=target_index): |
|
print(f"{target_index} index does not exist - skipping update for {match_value}") |
|
return 0 |
|
|
|
# Create a clean copy of new_data with serialisable dates |
|
new_data_clean = new_data.copy() |
|
for key, value in new_data_clean.items(): |
|
if isinstance(value, datetime): |
|
new_data_clean[key] = value.isoformat() |
|
|
|
# Build the params dictionary for the update script from the specified fields |
|
params = {} |
|
for field in fields_to_update: |
|
params[field] = new_data_clean.get(field) |
|
|
|
# Dynamically build the update script |
|
script_lines = [] |
|
for field in fields_to_update: |
|
script_lines.append(f""" |
|
if (params.{field} != null) {{ |
|
ctx._source.{field} = params.{field}; |
|
}} else {{ |
|
ctx._source.remove('{field}'); |
|
}} |
|
""") |
|
update_script = "\n".join(script_lines) |
|
|
|
# Dynamically build the up-to-date query clause, checking each field |
|
up_to_date_must = [] |
|
for field in fields_to_update: |
|
value = params.get(field) |
|
if value is not None: |
|
if isinstance(value, (list, tuple)): |
|
# For array fields, check if arrays are identical using a terms query |
|
up_to_date_must.append({"terms": {field: value}}) |
|
# Also check the array lengths match to ensure exact equality |
|
up_to_date_must.append({ |
|
"script": { |
|
"script": { |
|
"source": f"doc['{field}'].size() == params.size_{field}", |
|
"params": {f"size_{field}": len(value)}, |
|
} |
|
} |
|
}) |
|
else: |
|
# For non-array fields, use match_phrase |
|
up_to_date_must.append({"match_phrase": {field: value}}) |
|
else: |
|
up_to_date_must.append({ |
|
"bool": { |
|
"must_not": [{"exists": {"field": field}}] |
|
} |
|
}) |
|
up_to_date_clause = {"bool": {"must": up_to_date_must}} |
|
|
|
update_body = { |
|
"conflicts": "proceed", |
|
"script": { |
|
"source": update_script, |
|
"lang": "painless", |
|
"params": params, |
|
}, |
|
"query": { |
|
"bool": { |
|
"must": [{"term": {match_field: match_value}}], |
|
"must_not": [up_to_date_clause], |
|
} |
|
}, |
|
} |
|
|
|
result = es.update_by_query(index=target_index, body=update_body) |
|
return result["updated"] |
|
|
|
except Exception as e: |
|
print(f"Error updating documents for {match_field} {match_value}: {str(e)}") |
|
raise |