Skip to content

Instantly share code, notes, and snippets.

@bsidhom
Last active October 4, 2022 04:11
Show Gist options
  • Save bsidhom/b972b695adcd482d39412f87b707786e to your computer and use it in GitHub Desktop.
Save bsidhom/b972b695adcd482d39412f87b707786e to your computer and use it in GitHub Desktop.
Read stream of concatenated JSON contents
#!/usr/bin/env python3
import argparse
import asyncio
import codecs
import contextlib
import json
import sys
import threading
async def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--file",
help="Input file to read from. Stdin if equal to \"-\"",
default="-")
parser.add_argument("--buf-size",
type=int,
help="Read buffer size to use",
default=16 * 1024)
parser.add_argument("--impl",
help="Implementation to use",
default="async",
choices=IMPLEMENTATIONS.keys())
args = parser.parse_args()
if args.file == "-":
cm = contextlib.nullcontext(sys.stdin.buffer)
else:
cm = await aopen(args.file, "rb")
with cm as f:
await IMPLEMENTATIONS[args.impl](f, args.buf_size,
lambda j: print(json.dumps(j)))
return 0
async def aopen(*args, **kwargs):
return await asyncio.to_thread(open, *args, **kwargs)
async def read_json_async(input, read_size):
buf = bytearray()
text_decoder = codecs.getincrementaldecoder("utf-8")()
json_decoder = json.JSONDecoder()
# Invariant: string_buf has no leading whitespace.
string_buf = ""
while True:
# TODO: Consider adding a limit on string_buf size.
if len(string_buf) > 0:
# Don't bother attempting to parse if we don't have any input
# yet.
try:
j, index = json_decoder.raw_decode(string_buf)
# Strip leading whitespace before resetting buffer.
string_buf = string_buf[index:].lstrip()
yield j
continue
except json.JSONDecodeError as e:
# We either need more input or this is invalid JSON.
pass
# NOTE: Instead of f.read, this could be a read from any stream of
# interest, including a first-class asyncio stream.
b = await asyncio.to_thread(input.read, read_size)
if len(b) == 0:
if len(string_buf) > 0:
raise Exception(
f"unexpected EOF: trailing string data: '{string_buf}'")
elif len(text_decoder.getstate()[0]) > 0:
raise Exception(
f"unexpected EOF: trailing undecoded UTF-8 bytes: {text_decoder.getstate()[0]}"
)
# It's only safe to end if we have no decoded string content and no
# undecoded buffer state.
return
s = text_decoder.decode(b)
if len(string_buf) == 0:
# We have to strip leading whitespace to satisfy the json
# decoder. This is only safe to do _before_ the next object.
s = s.lstrip()
# TODO: Figure out how to build a string efficiently.
string_buf += s
async def read_json_queue(input, read_size):
queue = asyncio.Queue(1)
loop = asyncio.get_running_loop()
produce_task = None
done = object()
async def consume():
while True:
item = await queue.get()
if item is done:
break
yield item
# Ensure that producer exited normally.
await produce_task
def produce():
try:
for item in read_json_sync(input, read_size):
asyncio.run_coroutine_threadsafe(queue.put(item),
loop).result()
finally:
asyncio.run_coroutine_threadsafe(queue.put(done), loop).result()
produce_task = asyncio.create_task(asyncio.to_thread(produce))
return consume()
def read_json_sync(input, read_size):
buf = bytearray()
text_decoder = codecs.getincrementaldecoder("utf-8")()
json_decoder = json.JSONDecoder()
# Invariant: string_buf has no leading whitespace.
string_buf = ""
while True:
# TODO: Consider adding a limit on string_buf size.
if len(string_buf) > 0:
# Don't bother attempting to parse if we don't have any input
# yet.
try:
j, index = json_decoder.raw_decode(string_buf)
# Strip leading whitespace before resetting buffer.
string_buf = string_buf[index:].lstrip()
yield j
continue
except json.JSONDecodeError as e:
# We either need more input or this is invalid JSON.
pass
b = input.read(read_size)
if len(b) == 0:
if len(string_buf) > 0:
raise Exception(
f"unexpected EOF: trailing string data: '{string_buf}'")
elif len(text_decoder.getstate()[0]) > 0:
raise Exception(
f"unexpected EOF: trailing undecoded UTF-8 bytes: {text_decoder.getstate()[0]}"
)
# It's only safe to end if we have no decoded string content and no
# undecoded buffer state.
return
s = text_decoder.decode(b)
if len(string_buf) == 0:
# We have to strip leading whitespace to satisfy the json
# decoder. This is only safe to do _before_ the next object.
s = s.lstrip()
# TODO: Figure out how to build a string efficiently.
string_buf += s
async def do_async(input, buf_size, f):
async for j in read_json_async(input, buf_size):
f(j)
async def do_queue(input, buf_size, f):
async for j in await read_json_queue(input, buf_size):
f(j)
async def do_sync(input, buf_size, f):
for j in read_json_sync(input, buf_size):
f(j)
IMPLEMENTATIONS = {
"async": do_async,
"queue": do_queue,
"sync": do_sync,
}
if __name__ == "__main__":
sys.exit(asyncio.run(main()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment