Last active
October 4, 2022 04:11
-
-
Save bsidhom/b972b695adcd482d39412f87b707786e to your computer and use it in GitHub Desktop.
Read stream of concatenated JSON contents
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import asyncio | |
import codecs | |
import contextlib | |
import json | |
import sys | |
import threading | |
async def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--file", | |
help="Input file to read from. Stdin if equal to \"-\"", | |
default="-") | |
parser.add_argument("--buf-size", | |
type=int, | |
help="Read buffer size to use", | |
default=16 * 1024) | |
parser.add_argument("--impl", | |
help="Implementation to use", | |
default="async", | |
choices=IMPLEMENTATIONS.keys()) | |
args = parser.parse_args() | |
if args.file == "-": | |
cm = contextlib.nullcontext(sys.stdin.buffer) | |
else: | |
cm = await aopen(args.file, "rb") | |
with cm as f: | |
await IMPLEMENTATIONS[args.impl](f, args.buf_size, | |
lambda j: print(json.dumps(j))) | |
return 0 | |
async def aopen(*args, **kwargs): | |
return await asyncio.to_thread(open, *args, **kwargs) | |
async def read_json_async(input, read_size): | |
buf = bytearray() | |
text_decoder = codecs.getincrementaldecoder("utf-8")() | |
json_decoder = json.JSONDecoder() | |
# Invariant: string_buf has no leading whitespace. | |
string_buf = "" | |
while True: | |
# TODO: Consider adding a limit on string_buf size. | |
if len(string_buf) > 0: | |
# Don't bother attempting to parse if we don't have any input | |
# yet. | |
try: | |
j, index = json_decoder.raw_decode(string_buf) | |
# Strip leading whitespace before resetting buffer. | |
string_buf = string_buf[index:].lstrip() | |
yield j | |
continue | |
except json.JSONDecodeError as e: | |
# We either need more input or this is invalid JSON. | |
pass | |
# NOTE: Instead of f.read, this could be a read from any stream of | |
# interest, including a first-class asyncio stream. | |
b = await asyncio.to_thread(input.read, read_size) | |
if len(b) == 0: | |
if len(string_buf) > 0: | |
raise Exception( | |
f"unexpected EOF: trailing string data: '{string_buf}'") | |
elif len(text_decoder.getstate()[0]) > 0: | |
raise Exception( | |
f"unexpected EOF: trailing undecoded UTF-8 bytes: {text_decoder.getstate()[0]}" | |
) | |
# It's only safe to end if we have no decoded string content and no | |
# undecoded buffer state. | |
return | |
s = text_decoder.decode(b) | |
if len(string_buf) == 0: | |
# We have to strip leading whitespace to satisfy the json | |
# decoder. This is only safe to do _before_ the next object. | |
s = s.lstrip() | |
# TODO: Figure out how to build a string efficiently. | |
string_buf += s | |
async def read_json_queue(input, read_size): | |
queue = asyncio.Queue(1) | |
loop = asyncio.get_running_loop() | |
produce_task = None | |
done = object() | |
async def consume(): | |
while True: | |
item = await queue.get() | |
if item is done: | |
break | |
yield item | |
# Ensure that producer exited normally. | |
await produce_task | |
def produce(): | |
try: | |
for item in read_json_sync(input, read_size): | |
asyncio.run_coroutine_threadsafe(queue.put(item), | |
loop).result() | |
finally: | |
asyncio.run_coroutine_threadsafe(queue.put(done), loop).result() | |
produce_task = asyncio.create_task(asyncio.to_thread(produce)) | |
return consume() | |
def read_json_sync(input, read_size): | |
buf = bytearray() | |
text_decoder = codecs.getincrementaldecoder("utf-8")() | |
json_decoder = json.JSONDecoder() | |
# Invariant: string_buf has no leading whitespace. | |
string_buf = "" | |
while True: | |
# TODO: Consider adding a limit on string_buf size. | |
if len(string_buf) > 0: | |
# Don't bother attempting to parse if we don't have any input | |
# yet. | |
try: | |
j, index = json_decoder.raw_decode(string_buf) | |
# Strip leading whitespace before resetting buffer. | |
string_buf = string_buf[index:].lstrip() | |
yield j | |
continue | |
except json.JSONDecodeError as e: | |
# We either need more input or this is invalid JSON. | |
pass | |
b = input.read(read_size) | |
if len(b) == 0: | |
if len(string_buf) > 0: | |
raise Exception( | |
f"unexpected EOF: trailing string data: '{string_buf}'") | |
elif len(text_decoder.getstate()[0]) > 0: | |
raise Exception( | |
f"unexpected EOF: trailing undecoded UTF-8 bytes: {text_decoder.getstate()[0]}" | |
) | |
# It's only safe to end if we have no decoded string content and no | |
# undecoded buffer state. | |
return | |
s = text_decoder.decode(b) | |
if len(string_buf) == 0: | |
# We have to strip leading whitespace to satisfy the json | |
# decoder. This is only safe to do _before_ the next object. | |
s = s.lstrip() | |
# TODO: Figure out how to build a string efficiently. | |
string_buf += s | |
async def do_async(input, buf_size, f): | |
async for j in read_json_async(input, buf_size): | |
f(j) | |
async def do_queue(input, buf_size, f): | |
async for j in await read_json_queue(input, buf_size): | |
f(j) | |
async def do_sync(input, buf_size, f): | |
for j in read_json_sync(input, buf_size): | |
f(j) | |
IMPLEMENTATIONS = { | |
"async": do_async, | |
"queue": do_queue, | |
"sync": do_sync, | |
} | |
if __name__ == "__main__": | |
sys.exit(asyncio.run(main())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment