Last active
April 16, 2025 17:55
-
-
Save jorendorff/dc28adcdabd85a4505ce8c2dcc2d5c3a to your computer and use it in GitHub Desktop.
incremental json parser in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unicodedata | |
# Unique value meaning "not finished parsing" | |
class TimedOut: | |
def __repr__(self): return "TimedOut" | |
TimedOut = TimedOut() | |
BASIC_ESCAPES = { | |
'"': '"', | |
'\\': '\\', | |
'/': '/', | |
'b': '\b', | |
'f': '\f', | |
'n': '\n', | |
'r': '\r', | |
't': '\t' | |
} | |
class Parser: | |
def __init__(self, s): | |
self.s = s # source string that we're parsing | |
self.i = 0 # current read position within the string | |
self.stack = [] # stack of still-open objects and arrays | |
def skip_ws(self): | |
while self.i < len(self.s) and self.s[self.i].isspace(): | |
self.i += 1 | |
def fail(self, format, *args): | |
raise ValueError(format.format(*args) + " (at offset {})".format(self.i)) | |
def parse_string(self): | |
assert self.s[self.i] == '"' | |
start = self.i | |
self.i += 1 | |
strval = "" | |
while self.i < len(self.s): | |
c = self.s[self.i] | |
self.i += 1 | |
if c == '"': | |
return strval | |
elif c == '\\': | |
if self.i >= len(self.s): | |
break | |
c = [self.i] | |
self.i += 1 | |
if c in BASIC_ESCAPES: | |
strval += BASIC_ESCAPES[c] | |
elif c == 'u': | |
if self.i + 4 >= len(self.s): | |
break | |
hexcode = self.s[self.i:self.i + 4] | |
for c in hexcode: | |
if c not in '0123456789abcdefABCDEF': | |
self.fail("invalid \\u escape") | |
self.i += 4 | |
strval += chr(int(hexcode, 16)) | |
elif unicodedata.category(c).startswith('C'): | |
self.fail("unexpected control character in string") | |
else: | |
strval += c | |
self.fail("unterminated string literal") | |
def parse_key(self): | |
self.skip_ws() | |
if self.i >= len(self.s) or self.s[self.i] != '"': | |
self.fail("expected key in object") | |
key = self.parse_string() | |
self.skip_ws() | |
if self.i >= len(self.s) or self.s[self.i] != ':': | |
self.fail("expected ':' after key in object") | |
self.i += 1 | |
return key | |
def parse_one_value(self): | |
while True: | |
self.skip_ws() | |
if self.i >= len(self.s): | |
if self.stack: | |
self.fail("unexpected end of input in {}", | |
"object" if instance(self.stack[-1], tuple) else "array") | |
else: | |
self.fail("input was all whitespace") | |
c = self.s[self.i] | |
if c in "-0123456789": | |
start = self.i | |
while self.i < len(self.s) and self.s[self.i] in "-0123456789.eE": | |
self.i += 1 | |
numstr = self.s[start:self.i] | |
if '.' in numstr or 'e' in numstr.lower(): | |
return float(numstr) | |
else: | |
return int(numstr) | |
elif c == '"': | |
return self.parse_string() | |
elif c == 't' and self.s[self.i:self.i + 4] == "true": | |
self.i += 4 | |
return True | |
elif c == 'f' and self.s[self.i:self.i + 5] == "false": | |
self.i += 5 | |
return False | |
elif c == 'n' and self.s[self.i:self.i + 4] == "null": | |
self.i += 4 | |
return None | |
elif c == '{': | |
self.i += 1 | |
self.skip_ws() | |
if self.i < len(self.s) and self.s[self.i] == '}': | |
self.i += 1 | |
return {} | |
else: | |
key = self.parse_key() | |
self.stack.append(({}, key)) | |
continue | |
elif c == '}': | |
self.i += 1 | |
top = self.stack.pop() | |
if not isinstance(top, tuple): | |
self.fail("found '}' at end of array, expected ']'") | |
obj, key = top | |
assert key is None | |
return obj | |
elif c == '[': | |
self.i += 1 | |
self.stack.append([]) | |
continue | |
elif c == ']': | |
self.i += 1 | |
top = self.stack.pop() | |
if not isinstance(top, list): | |
self.fail("found ']' at end of object, expected '}'") | |
return top | |
else: | |
self.fail("unexpected character {!r}", c) | |
def store_value(self, v): | |
top = self.stack[-1] | |
if isinstance(top, tuple): | |
obj, key = top | |
obj[key] = v | |
self.skip_ws() | |
if self.i >= len(self.s): | |
self.fail("unmatched '{'") | |
elif self.s[self.i] == ',': | |
self.i += 1 | |
key = self.parse_key() | |
self.stack[-1] = obj, key | |
elif self.s[self.i] == '}': | |
self.stack[-1] = obj, None | |
else: | |
self.fail("expected ',' or '}' after key-value pair in object") | |
else: | |
assert isinstance(self.stack[-1], list) | |
self.stack[-1].append(v) | |
self.skip_ws() | |
if self.i >= len(self.s): | |
self.fail("unmatched '['") | |
elif self.s[self.i] == ',': | |
self.i += 1 | |
elif self.s[self.i] == ']': | |
pass | |
else: | |
self.fail("expected ',' or ']' after array element") | |
def finish(self): | |
self.skip_ws() | |
if self.i < len(self.s): | |
self.fail("unexpected extra data after JSON") | |
def parse_some(self, n = 1): | |
for _i in range(n): | |
v = self.parse_one_value() | |
if self.stack: | |
self.store_value(v) | |
else: | |
self.finish() | |
return v | |
return TimedOut | |
def parse(s): | |
p = Parser(s) | |
result = TimedOut | |
while result is TimedOut: | |
result = p.parse_some() | |
return result | |
def test(): | |
import json | |
test_inputs = [ | |
'{}', | |
'{"0": 1}', | |
' []', | |
'{"dexys": {"midnight":"runners"}}', | |
'true', | |
'false', | |
'null', | |
'-1', | |
'1', | |
'1e300', | |
'-1e300', | |
'1e9999', | |
'-1e9999', | |
'-0', | |
'[true, false, null, -1, 1, 1e300, 1e9999]', | |
'[' * 100 + ']' * 100 | |
] | |
def strict_eq(a, b): | |
t = type(b) | |
if type(a) is not t: | |
return False | |
if t is dict: | |
actual = list(a.items()) | |
expected = list(b.items()) | |
if len(actual) != len(expected): | |
return False | |
return all(strict_eq(ak, bk) and strict_eq(av, bv) | |
for (ak, av), (bk, bv) in zip(actual, expected)) | |
elif t is list: | |
if len(a) != len(b): | |
return False | |
return all(strict_eq(av, bv) for av, bv in zip(a, b)) | |
elif t is float: | |
return repr(a) == repr(b) # distinguishes NaN, infinities, negative zero | |
else: | |
return a == b | |
for test in test_inputs: | |
print(test) | |
if not strict_eq(parse(test), json.loads(test)): | |
raise ValueError("assertion failed") | |
print(" ok") | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment