Created
March 6, 2025 19:10
-
-
Save hartsock/37a8d5fb54af462580d200f88e87d3ac to your computer and use it in GitHub Desktop.
Tries to interpret arbitrary log data with embedded python and json objects as valid JSON. This is probably not safe. This really only understands the Python and JSON when they are on their own lines.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import collections.abc | |
import json | |
import numbers | |
import sys | |
############################################################################### | |
# converts a stdin text stream of python data and json data into pure json | |
# this script should eventually get deleted | |
# this script has NO smart flow control | |
# it is designed to run with a stream of input data | |
# | |
# call: | |
# | |
# $ cat logs_to_json_example.txt | ./logs_to_json.py | |
# | |
############################################################################### | |
debug: bool = False | |
def eval_data(line: str): | |
try: | |
raw_data = eval(line) | |
out = [] | |
for raw_row in raw_data: | |
row = _process(raw_row) | |
out.append(row) | |
return out | |
except NameError as error: | |
if debug: | |
print(error, file=sys.stderr) | |
pass | |
except SyntaxError as error: | |
if debug: | |
print(error, file=sys.stderr) | |
pass | |
return line | |
def _process(row): | |
if isinstance(row, str): | |
if '\\n' in str(row) or '\n' in str(row): | |
return _multiline_to_list(row) | |
return eval_data(row) | |
if isinstance(row, numbers.Number): | |
return row | |
if isinstance(row, collections.abc.Iterable): | |
out = [] | |
for item in row: | |
processed = _process(item) | |
out.append(processed) | |
return out | |
raise RuntimeError('unsupported data type: ' + row.__class__.__name__) | |
def _multiline_to_list(row): | |
split_row = str(row).rsplit('\n') | |
if len(split_row) == 1: | |
return split_row[0] | |
out = [] | |
for r in split_row: | |
if len(r): | |
out.append(_interpret_json(r)) | |
return out | |
def _interpret_json(line): | |
try: | |
data = json.loads(line) | |
return data | |
except json.decoder.JSONDecodeError: | |
return eval_data(line) | |
def main(): | |
data = [] | |
for raw_line in sys.stdin.readlines(): | |
out_line = eval_data(raw_line) | |
data.append(out_line) | |
print(json.dumps(data, indent=2)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment