Skip to content

Instantly share code, notes, and snippets.

@njsmith
Created July 19, 2018 22:53
Show Gist options
  • Select an option

  • Save njsmith/ad5effec203d540fdc1236a58d901bb3 to your computer and use it in GitHub Desktop.

Select an option

Save njsmith/ad5effec203d540fdc1236a58d901bb3 to your computer and use it in GitHub Desktop.
# Really need to factor out a bunch of this into reusable logic
# That's what I eventually want to do with the sansio_toolbelt library
# But for now, this may be handy...
async def receive_lines(rstream, *, receive_size=16384, max_line_size=16384):
buf = bytearray()
searched = 0
while True:
found = buf.find(b"\n", searched)
if found == -1:
# No newline yet
if len(buf) > max_line_size:
raise RuntimeError("line is toooooo looooonnng")
searched = len(buf)
new_data = await rstream.receive_some(receive_size)
if not new_data:
# EOF
if buf:
raise RuntimeError("final line truncated?!")
return
buf += new_data
else:
yield buf[:found]
searched = 0
del buf[:found + 1]
# Usage:
async for line in receive_lines(stream):
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment