Last active
October 18, 2021 17:28
-
-
Save outofmbufs/bcf3bccc1fe0bb0824871ec5e02cc60e to your computer and use it in GitHub Desktop.
python function to decode multiple JSON representations in a single file (stream)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The MIT License | |
# | |
# Copyright (c) 2019 Neil Webber | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
# THE SOFTWARE. | |
import json | |
# | |
# A MultiDecoder is a subclass of json.JSONDecoder and adds: | |
# | |
# sizelimit - Object __init__() parameter. If set, protects against an | |
# "unbounded stream of unresolved JSON/garbage" attack by capping | |
# the max size for buffered data. Default is no limit. | |
# | |
# [exception] SkipAllError - tells loadmultijson the string contained only | |
# ignorable characters (e.g., whitespace), so it can eat them rather | |
# than report them as a garbage tuple if they are trailing data. | |
# | |
class MultiDecoder(json.JSONDecoder): | |
def __init__(self, *, sizelimit=0): | |
self.__sizelimit = sizelimit | |
super().__init__() | |
def raw_decode(self, s): | |
if self.__sizelimit > 0 and len(s) > self.__sizelimit: | |
raise ValueError(f"JSON string exceeds limit {self.__sizelimit}") | |
stripped = s.lstrip() # because raw_decode doesn't allow | |
if len(stripped) == 0: | |
raise SkipAllError() | |
obj, n = super().raw_decode(stripped) | |
n += len(s) - len(stripped) # adjust to count the stripped chars | |
return obj, n | |
# raised when the entire string should be discard (e.g., is all whitespace) | |
class SkipAllError(Exception): | |
pass | |
# loadmultijson is a generator that yields individual JSON objects from a | |
# stream containing multiple such strings, yielding a tuple (obj, str) as | |
# each JSON representation is parsed. | |
# | |
# For example if the input stream is: | |
# [1, 2, 3] {"k1": 42} 17 | |
# | |
# three tuples will be generated: | |
# ([1, 2, 3], '[1, 2, 3]') | |
# ({'k1': 42}, '{"k1": 42}') | |
# (17, '17') | |
# | |
# If there is unparseable JSON in the stream it will be returned as the | |
# last tuple, as a 'garbage tuple' distinguished by the obj (first element) | |
# being None. For example: | |
# | |
# [1, 2, 3] ooops | |
# | |
# will yield: | |
# ([1, 2, 3], '[1, 2, 3]') | |
# (None, 'ooops') | |
# | |
# Be aware that any JSON syntax error in the stream will cause the entire | |
# rest of the stream to be consumed and an enormous garbage tuple to be | |
# returned, as the algorithm just tacks on more and more data so long as | |
# the current buffer will not parse. Consider for example a stream containing | |
# three JSON representations (the middle one being illegal): | |
# [1, 2, 3] {"bogus"} [ ...<enormous array here>... ] | |
# This will return one valid tuple, then one enormous garbage tuple containing | |
# the entire rest of the stream. There is no attempt to try to discern where | |
# an invalid JSON object "looks like it ends". The MultiDecoder (the | |
# default decoder) can optionally enforce a sizelimit to (crudely) control | |
# this. An ambitious custom decoder could try to recover the parse stream, | |
# possibly knowing something about the underlying application. | |
# | |
# | |
# REQUIRED POSITIONAL ARGUMENTS: | |
# | |
# readable - anything that has an io style .read() method | |
# | |
# OPTIONAL KEYWORD ARGUMENTS: | |
# decoder - Nominally a JSON decoder object but the only required | |
# method is raw_decode. By default it is a MultiDecoder. | |
# | |
# NOTE: The decoder can raise SkipAllError to communicate that the | |
# entire string it was given is ignorable characters. This | |
# allows a decoder to cause trailing ignorable characters in | |
# a stream (e.g., trailing whitespace) to be eaten rather than | |
# instead being ultimately returned as a trailing garbage tuple | |
def loadmultijson(readable, *, decoder=MultiDecoder()): | |
# read() chunks in this size; arbitrary and ANY VALUE WORKS though | |
# obviously there is a weak time/space tradeoff lurking in this choice. | |
incrsize = 10000 | |
chunk = "" | |
while len(newdata := readable.read(incrsize)) > 0: | |
chunk += newdata | |
while len(chunk) > 0: | |
try: | |
obj, n = decoder.raw_decode(chunk) | |
except SkipAllError: | |
chunk = "" | |
except json.JSONDecodeError: | |
break # no complete json yet; append more data | |
else: | |
yield obj, chunk[:n] | |
chunk = chunk[n:] | |
# any leftover data is trailing garbage. Caller has to look for | |
# this as a None JSON object in the tuple | |
if len(chunk) > 0: | |
yield None, chunk | |
# TESTS | |
if __name__ == '__main__': | |
import unittest | |
import io | |
import random | |
class MJTestMethods(unittest.TestCase): | |
def test_whitespace(self): | |
teststring = " [1, 2, 3] \n " | |
f = io.StringIO(teststring) | |
tuples = list(loadmultijson(f)) | |
self.assertEqual(len(tuples), 1) | |
obj, s = tuples[0] | |
self.assertEqual(obj, [1, 2, 3]) | |
self.assertEqual(f.read(), '') # should be at EOF | |
def test_limit(self): | |
teststring = "[1, 2, 3]" | |
f = io.StringIO(teststring) | |
decoder = MultiDecoder(sizelimit=len(teststring)) | |
tuples = list(loadmultijson(f, decoder=decoder)) | |
self.assertEqual(len(tuples), 1) | |
obj, s = tuples[0] | |
self.assertEqual(obj, [1, 2, 3]) | |
self.assertEqual(f.read(), '') # should be at EOF | |
f = io.StringIO(teststring) | |
decoder = MultiDecoder(sizelimit=len(teststring)-1) | |
with self.assertRaises(ValueError): | |
tuples = list(loadmultijson(f, decoder=decoder)) | |
self.assertEqual(f.read(), '') # should be at EOF | |
def test_big(self): | |
# this sort of assumes incrsize is lower than this | |
biggerthanincr = 123456 | |
teststring = " " * biggerthanincr | |
teststring += "[1, 2, 3]" | |
teststring += " " * biggerthanincr | |
f = io.StringIO(teststring) | |
tuples = list(loadmultijson(f)) | |
self.assertEqual(len(tuples), 1) | |
obj, s = tuples[0] | |
self.assertEqual(obj, [1, 2, 3]) | |
self.assertEqual(f.read(), '') | |
def test_random(self): | |
# the goal here is to try to hit various boundary cases | |
# XXX really should do an exhaustive analysis instead of random | |
for iteration in range(200): | |
ntuples = random.randint(10, 100) | |
teststring = "\n" * random.randint(1000, 20000) | |
for t in range(ntuples): | |
teststring += f"[{t}]" | |
teststring += " " * random.randint(1000, 20000) | |
f = io.StringIO(teststring) | |
tn = 0 | |
for obj, s in loadmultijson(f): | |
self.assertEqual(obj, [tn]) | |
tn += 1 | |
self.assertEqual(tn, ntuples) | |
self.assertEqual(f.read(), '') | |
def test_tight(self): | |
ntuples = 1000 | |
teststring = "" | |
for t in range(ntuples): | |
teststring += json.dumps([t]*(t+1)) | |
f = io.StringIO(teststring) | |
tn = 0 | |
for obj, s in loadmultijson(f): | |
self.assertEqual(obj, [tn]*(tn+1)) | |
tn += 1 | |
self.assertEqual(tn, ntuples) | |
self.assertEqual(f.read(), '') | |
def test_garbage(self): | |
for bogon in ('[1, 2, 3]{"this is bogus"}', | |
' [1, 2, 3] {"this is still bogus"} ', | |
' [1, 2, 3] {"k1": {"this is nestedly bogus"}}'): | |
f = io.StringIO(bogon) | |
tuples = list(loadmultijson(f)) | |
self.assertEqual(len(tuples), 2) | |
obj, s = tuples[0] | |
self.assertEqual(obj, [1, 2, 3]) | |
self.assertEqual(f.read(), '') | |
# garbage tuple | |
obj, s = tuples[1] | |
self.assertIsNone(obj) | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment