Created
December 24, 2020 09:21
-
-
Save 2minchul/7a6182597643934c5fb61fec7ab20480 to your computer and use it in GitHub Desktop.
fix truncated json string
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import json | |
class Ascii: | |
quote = ord('"') | |
backslash = ord('\\') | |
comma = ord(',') | |
colon = ord(':') | |
dot = ord('.') | |
l = ord('l') | |
def fix_truncated_json_bytes(data: bytes) -> bytes: | |
stack = [] | |
is_quote_open = False | |
for i, char in enumerate(data): | |
char: int | |
if is_quote_open: | |
is_quote_open = not (char == Ascii.quote and data[i - 1] != Ascii.backslash) | |
continue | |
if char == Ascii.quote: | |
is_quote_open = True | |
continue | |
if char in b'[{': | |
stack.append(char) | |
elif char in b'}]': | |
stack.pop() | |
last = data[-1] | |
with io.BytesIO(data) as buffer: | |
buffer.seek(0, io.SEEK_END) | |
if is_quote_open: | |
last = Ascii.quote | |
buffer.write(b'"') | |
elif last == Ascii.comma: | |
# delete last character | |
buffer.seek(buffer.tell() - 2) | |
last = buffer.read(1) | |
elif last == Ascii.dot: # "[2.1, 0." | |
last = b'0' | |
buffer.write(last) | |
elif last == Ascii.colon: | |
last = Ascii.l | |
buffer.write(b'null') | |
if last == Ascii.quote and stack[-1] in b'{': | |
cur = buffer.tell() | |
begin = data.rfind(b'{') | |
buffer.seek(begin) | |
begin_to_end = buffer.read(cur - begin) | |
if begin_to_end.count(b':') == begin_to_end.count(b','): | |
buffer.write(b':null') | |
# stack[i]+2 means convert '{' and '[' into '}' and ']' | |
reversed_brackets = bytes([stack[i] + 2 for i in range(len(stack) - 1, -1, -1)]) | |
buffer.write(reversed_brackets) | |
cur = buffer.tell() | |
buffer.seek(0) | |
return buffer.read(cur) | |
def fix_truncated_json(data: str): | |
stack = [] | |
is_quote_open = False | |
for i, char in enumerate(data): | |
char: str | |
if is_quote_open: | |
is_quote_open = not (char == '"' and data[i - 1] != '\\') | |
continue | |
if char == '"': | |
is_quote_open = True | |
continue | |
if char in '[{': | |
stack.append(char) | |
elif char in '}]': | |
stack.pop() | |
last = data[-1] | |
with io.StringIO(data) as buffer: | |
buffer.seek(0, io.SEEK_END) | |
if is_quote_open: | |
last = '"' | |
buffer.write(last) | |
elif last == ',': | |
# delete last character | |
buffer.seek(buffer.tell() - 2) | |
last = buffer.read(1) | |
elif last == '.': # "[2.1, 0." | |
last = '0' | |
buffer.write(last) | |
elif data[-1] == ':': | |
last = 'l' | |
buffer.write('null') | |
if last == '"' and stack[-1] == '{': | |
cur = buffer.tell() | |
begin = data.rfind('{') | |
buffer.seek(begin) | |
begin_to_end = buffer.read(cur - begin) | |
if begin_to_end.count(':') == begin_to_end.count(','): | |
buffer.write(':null') | |
# chr(ord(stack[i]) + 2) means convert '{' and '[' into '}' and ']' | |
reversed_brackets = ''.join([chr(ord(stack[i]) + 2) for i in range(len(stack) - 1, -1, -1)]) | |
buffer.write(reversed_brackets) | |
cur = buffer.tell() | |
buffer.seek(0) | |
return buffer.read(cur) | |
if __name__ == '__main__': | |
broken = '[{"category":"expr","points":[[0.58333,0.75391],[0.64236,0.75391],' \ | |
'[0.64236,0.77148],[0.58333,0.77148]]},{"boxes":[{"bottom":0.76875,"box_type":"text_char","char":"나",' \ | |
'"left":0.6537,"right":0.67407,"space":false,"top":0.75677},{"bottom":0.76875,"box_type":"text_char",' \ | |
'"char":"머","left":0.675,"right":0.69167,"space":false,"top":0.75677},{"bottom":0.76875,' \ | |
'"box_type":"text_char","char":"지","left":0.69537,"' | |
for i in range(50): | |
new_broken = broken[:len(broken) - i] | |
fixed = fix_truncated_json(new_broken) | |
print(f'{new_broken[-20:]} -> {fixed[-20:]}') | |
json.loads(fixed) # raise error if invalid |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment