Created
May 10, 2024 06:31
-
-
Save chapmanjacobd/d7044f9c227780e7fd073d901daf5d3b to your computer and use it in GitHub Desktop.
Filter JSON based on example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import json | |
| import sys | |
| from typing import Any, Dict | |
| import ijson | |
| import ijson.common | |
| from xklb.utils import argparse_utils | |
| def parse_args(): | |
| parser = argparse_utils.ArgumentParser( | |
| description='Stream JSON data from a file and use an example to filter to output' | |
| ) | |
| parser.add_argument('example_file', type=str) | |
| parser.add_argument("input_file", nargs="?", type=argparse.FileType("r"), default=sys.stdin) | |
| parser.add_argument('output_file', nargs='?', type=argparse.FileType('a'), default=sys.stdout) | |
| args = parser.parse_args() | |
| return args | |
| def find_items_in_json(example_data, json_data): | |
| best_score = 0 | |
| best_match = None | |
| def compare_dicts(dict1: Dict[str, Any], dict2: Dict[str, Any]) -> float: | |
| score = 0 | |
| total_keys = len(dict1.keys()) | |
| for key, value in dict1.items(): | |
| if key in dict2 and dict2[key] == value: | |
| score += 1 | |
| return score / total_keys if total_keys > 0 else 0 | |
| def find_best_match(example_item, data) -> None: | |
| nonlocal best_score, best_match | |
| if isinstance(data, dict): | |
| for key, value in data.items(): | |
| if isinstance(value, (dict, list)): | |
| find_best_match(example_item, value) | |
| elif isinstance(data, list): | |
| for list_item in data: | |
| if isinstance(list_item, dict): | |
| score = compare_dicts(example_item, list_item) | |
| if score > best_score: | |
| best_score = score | |
| best_match = list_item | |
| elif score == best_score: | |
| # If scores are equal, prefer the shorter one | |
| best_match = list_item if len(str(list_item)) < len(str(best_match)) else best_match | |
| elif isinstance(list_item, list): | |
| find_best_match(example_item, list_item) | |
| if isinstance(example_data, dict): | |
| example_data = [example_data] | |
| for example_item in example_data: | |
| find_best_match(example_item, json_data) | |
| if best_match: | |
| yield best_match | |
| best_score = 0 | |
| best_match = None | |
| def stream_json(args, example_data): | |
| from splitstream import splitfile | |
| for json_str in splitfile(args.input_file, format="json"): | |
| objects = ijson.items(json_str, '') | |
| for o in objects: | |
| yield from find_items_in_json(example_data, o) | |
| def main(): | |
| args = parse_args() | |
| example_data = json.load(open(args.example_file)) | |
| for d in stream_json(args, example_data): | |
| json_str = json.dumps(d, indent=None) | |
| print(json_str, file=args.output_file) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment