Created
February 9, 2024 23:17
-
-
Save kevinslin/15e0c127d3706fdedd29fb432d31ea7d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import sys | |
import json | |
from typing import Optional, Tuple | |
import pprint | |
from log_filters import filter_logs_by_source_type | |
class MergePolicy: | |
def __init__(self, source_type: str, msg_field: str = "message", timestamp_field: str = "timestamp"): | |
self._source_type = source_type | |
self._msg_field = msg_field | |
self._timestamp_field = timestamp_field | |
def _gen_policy_for_list_field(self, field: str, json_logs: list[dict]) -> list[dict]: | |
#Generate the policy for list type field. | |
group_by_treshold = 0.8 | |
unique_values = set() | |
for record in json_logs: | |
unique_values.add(str(record[field])) | |
if len(unique_values) == 1: | |
return [{"action": "pull_up"}, {"action": "merge", "strategy": "discard"}] | |
elif len(unique_values) < group_by_treshold * len(json_logs): | |
return [{"action": "pull_up"}, {"action": "group_by"}] | |
else: | |
return [{"action": "noop"}] | |
def _encode_fields(self, fields: list[str]) -> str: | |
return ".".join(fields) | |
def _decode_fields(self, fields_str: str) -> list[str]: | |
return fields_str.strip().split('.') | |
def _gen_policy_for_dict_field(self, field: str, json_logs: list[dict]) -> dict: | |
#Generate the policy for dict type field. Only do one level down. | |
policy = {} | |
group_by_treshold = 0.8 | |
sample_log = json_logs[0] | |
child_fields = sample_log[field].keys() | |
for child_field in child_fields: | |
# Inference the type of the child_field | |
serialize_flag = False | |
if type(sample_log[field][child_field]) in {list, dict}: | |
serialize_flag = True | |
unique_values = set() | |
for record in json_logs: | |
val = record[field][child_field] | |
if serialize_flag: | |
unique_values.add(str(val)) | |
else: | |
unique_values.add(val) | |
merged_key = self._encode_fields([field, child_field]) | |
if len(unique_values) == 1: | |
policy[merged_key] = [{"action": "pull_up"}, {"action": "merge", "strategy": "discard"}] | |
elif len(unique_values) < group_by_treshold * len(json_logs): | |
policy[merged_key] = [{"action": "pull_up"}, {"action": "group_by"}] | |
else: | |
policy[merged_key] = [{"action": "noop"}] | |
return policy | |
def _gen_policy_for_non_obj_field(self, field: str, json_logs: list[dict]) -> list[dict]: | |
group_by_treshold = 0.8 | |
unique_values = set() | |
for record in json_logs: | |
unique_values.add(record[field]) | |
if len(unique_values) == 1: | |
return [{"action": "merge", "strategy": "discard"}] | |
elif len(unique_values) < group_by_treshold * len(json_logs): | |
return [{"action": "group_by"}] | |
else: | |
return [{"action": "merge", "strategy": "array"}] | |
def gen_merge_policy(self, json_logs: list[dict]) -> list[dict]: | |
#Generate merge policy (or strategy) for given json_logs with same schema. | |
if not json_logs: | |
return json_logs | |
policy = {} | |
#Cluster top level fields | |
dict_fields = [] | |
list_fields = [] | |
non_obj_fields = [] | |
sample_log = json_logs[0] | |
for field in sample_log: | |
if field == self._source_type: | |
# values of source_type are suppose to be same for all logs. | |
policy[field] = [{"action": "merge", "strategy": "discard"}] | |
elif sample_log[field] is None: | |
import pdb; pdb.set_trace() | |
non_obj_fields.append(field) | |
elif type(sample_log[field]) == dict: | |
dict_fields.append(field) | |
elif type(sample_log[field]) == list: | |
list_fields.append(field) | |
else: | |
non_obj_fields.append(field) | |
print("non_obj_fields: ", non_obj_fields) | |
for field in non_obj_fields: | |
policy[field] = self._gen_policy_for_non_obj_field(field, json_logs) | |
print("list_fields: ", list_fields) | |
for field in list_fields: | |
policy[field] = self._gen_policy_for_list_field(field, json_logs) | |
print("dict_fields: ", dict_fields) | |
for field in dict_fields: | |
nested_policy = self._gen_policy_for_dict_field(field, json_logs) | |
policy.update(nested_policy) | |
return policy | |
__usage = """python3 gen_merge_policy.py [source_type_field] [log_file.json]\n""" | |
if __name__ == "__main__": | |
if len(sys.argv) != 3: | |
print(__usage) | |
source_type = sys.argv[1] | |
json_filename = sys.argv[2] | |
json_logs = [] | |
with open(json_filename, 'r') as f: | |
json_logs = json.load(f) | |
filtered_logs = [] | |
for rec in filter_logs_by_source_type(json_logs, source_type): | |
if isinstance(rec, dict): | |
filtered_logs.append(rec) | |
merger = MergePolicy(source_type) | |
policy = merger.gen_merge_policy(filtered_logs) | |
pprint.pprint(policy) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment