Created
June 7, 2011 16:22
-
-
Save simon-engledew/1012593 to your computer and use it in GitHub Desktop.
python chunked encoding
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
def from_pattern(pattern, type): | |
def coerce(value, *args): | |
value = str(value) | |
match = pattern.search(value) | |
if match is not None: | |
return type(match.group(1), *args) | |
raise ValueError("unable to coerce '%s' into a %s" % (value, type.__name__)) | |
return coerce | |
to_int = from_pattern(re.compile('([-+]?[0-9A-F]+)', re.IGNORECASE), int) | |
def chunked_encoder(fileish, chunk_limit=megabytes(0.5)): | |
def generator(): | |
while True: | |
value = fileish.read(chunk_limit) | |
bytes = len(value) | |
if bytes: | |
yield '%x\r\n' % bytes | |
yield '%s\r\n' % value | |
else: | |
yield '0\r\n' | |
yield '\r\n' | |
return | |
return generator() | |
# length_limit=20 | |
def chunked_decoder(fileish, chunk_limit=megabytes(1)): | |
def generator(): | |
while True: | |
index = fileish.readline(len('%x' % chunk_limit)) | |
if not index: | |
raise EOFError("unexpected blank line") | |
length = coerce.to_int(index, 16) | |
if not length: | |
return | |
if length > chunk_limit: | |
raise OverflowError("invalid chunk size of '%d' requested, max is '%d'" % (length, chunk_limit)) | |
value = fileish.read(length) | |
yield value | |
tail = fileish.read(2) | |
assert tail == "\r\n", "unexpected characters '%s' after chunk" % tail | |
return generator() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment