Skip to content

Instantly share code, notes, and snippets.

@kevinslin
Created February 9, 2024 23:17
Show Gist options
  • Save kevinslin/15e0c127d3706fdedd29fb432d31ea7d to your computer and use it in GitHub Desktop.
Save kevinslin/15e0c127d3706fdedd29fb432d31ea7d to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import sys
import json
from typing import Optional, Tuple
import pprint
from log_filters import filter_logs_by_source_type
class MergePolicy:
def __init__(self, source_type: str, msg_field: str = "message", timestamp_field: str = "timestamp"):
self._source_type = source_type
self._msg_field = msg_field
self._timestamp_field = timestamp_field
def _gen_policy_for_list_field(self, field: str, json_logs: list[dict]) -> list[dict]:
#Generate the policy for list type field.
group_by_treshold = 0.8
unique_values = set()
for record in json_logs:
unique_values.add(str(record[field]))
if len(unique_values) == 1:
return [{"action": "pull_up"}, {"action": "merge", "strategy": "discard"}]
elif len(unique_values) < group_by_treshold * len(json_logs):
return [{"action": "pull_up"}, {"action": "group_by"}]
else:
return [{"action": "noop"}]
def _encode_fields(self, fields: list[str]) -> str:
return ".".join(fields)
def _decode_fields(self, fields_str: str) -> list[str]:
return fields_str.strip().split('.')
def _gen_policy_for_dict_field(self, field: str, json_logs: list[dict]) -> dict:
#Generate the policy for dict type field. Only do one level down.
policy = {}
group_by_treshold = 0.8
sample_log = json_logs[0]
child_fields = sample_log[field].keys()
for child_field in child_fields:
# Inference the type of the child_field
serialize_flag = False
if type(sample_log[field][child_field]) in {list, dict}:
serialize_flag = True
unique_values = set()
for record in json_logs:
val = record[field][child_field]
if serialize_flag:
unique_values.add(str(val))
else:
unique_values.add(val)
merged_key = self._encode_fields([field, child_field])
if len(unique_values) == 1:
policy[merged_key] = [{"action": "pull_up"}, {"action": "merge", "strategy": "discard"}]
elif len(unique_values) < group_by_treshold * len(json_logs):
policy[merged_key] = [{"action": "pull_up"}, {"action": "group_by"}]
else:
policy[merged_key] = [{"action": "noop"}]
return policy
def _gen_policy_for_non_obj_field(self, field: str, json_logs: list[dict]) -> list[dict]:
group_by_treshold = 0.8
unique_values = set()
for record in json_logs:
unique_values.add(record[field])
if len(unique_values) == 1:
return [{"action": "merge", "strategy": "discard"}]
elif len(unique_values) < group_by_treshold * len(json_logs):
return [{"action": "group_by"}]
else:
return [{"action": "merge", "strategy": "array"}]
def gen_merge_policy(self, json_logs: list[dict]) -> list[dict]:
#Generate merge policy (or strategy) for given json_logs with same schema.
if not json_logs:
return json_logs
policy = {}
#Cluster top level fields
dict_fields = []
list_fields = []
non_obj_fields = []
sample_log = json_logs[0]
for field in sample_log:
if field == self._source_type:
# values of source_type are suppose to be same for all logs.
policy[field] = [{"action": "merge", "strategy": "discard"}]
elif sample_log[field] is None:
import pdb; pdb.set_trace()
non_obj_fields.append(field)
elif type(sample_log[field]) == dict:
dict_fields.append(field)
elif type(sample_log[field]) == list:
list_fields.append(field)
else:
non_obj_fields.append(field)
print("non_obj_fields: ", non_obj_fields)
for field in non_obj_fields:
policy[field] = self._gen_policy_for_non_obj_field(field, json_logs)
print("list_fields: ", list_fields)
for field in list_fields:
policy[field] = self._gen_policy_for_list_field(field, json_logs)
print("dict_fields: ", dict_fields)
for field in dict_fields:
nested_policy = self._gen_policy_for_dict_field(field, json_logs)
policy.update(nested_policy)
return policy
__usage = """python3 gen_merge_policy.py [source_type_field] [log_file.json]\n"""
if __name__ == "__main__":
if len(sys.argv) != 3:
print(__usage)
source_type = sys.argv[1]
json_filename = sys.argv[2]
json_logs = []
with open(json_filename, 'r') as f:
json_logs = json.load(f)
filtered_logs = []
for rec in filter_logs_by_source_type(json_logs, source_type):
if isinstance(rec, dict):
filtered_logs.append(rec)
merger = MergePolicy(source_type)
policy = merger.gen_merge_policy(filtered_logs)
pprint.pprint(policy)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment