Last active
September 10, 2018 21:34
-
-
Save almarklein/52e6d27d88fd46688006da0b0c697935 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Implementation of an iterator that yields the lines in a file in reverse order. | |
Copyright (C) 2018, Almar Klein | |
BSD licensed. | |
""" | |
## ===== The code ===== | |
def readlines_reversed(f): | |
""" Iterate over the lines in a file in reverse. The file must be | |
open in 'rb' mode. Yields the lines unencoded (as bytes), including the | |
newline character. Produces the same result as readlines, but reversed. | |
If this is used to reverse the line in a file twice, the result is | |
exactly the same. | |
""" | |
head = b"" | |
f.seek(0, 2) | |
t = f.tell() | |
buffersize, maxbuffersize = 64, 4096 | |
while True: | |
if t <= 0: | |
break | |
# Read next block | |
buffersize = min(buffersize * 2, maxbuffersize) | |
tprev = t | |
t = max(0, t - buffersize) | |
f.seek(t) | |
lines = f.read(tprev - t).splitlines(True) | |
# Align to line breaks | |
if not lines[-1].endswith((b"\n", b"\r")): | |
lines[-1] += head # current tail is previous head | |
elif head == b"\n" and lines[-1].endswith(b"\r"): | |
lines[-1] += head # Keep \r\n together | |
elif head: | |
lines.append(head) | |
head = lines.pop(0) # can be '\n' (ok) | |
# Iterate over current block in reverse | |
for line in reversed(lines): | |
yield line | |
if head: | |
yield head | |
## ===== The tests ===== | |
import io | |
import random | |
LINES1 = """ | |
foo | |
bar | |
x | |
""" | |
def reversetext(text): | |
f = io.BytesIO(text.encode()) | |
return b''.join(readlines_reversed(f)).decode() | |
def splitted(text): | |
f = io.BytesIO(text.encode()) | |
return list(reversed([line.decode() for line in readlines_reversed(f)])) | |
def test_readlines_reserved(): | |
# Empty | |
assert reversetext('') == '' | |
assert splitted('') == [] | |
# Single chars | |
for c in 'x \t\r\n€Ř"\\\'"': | |
assert reversetext(c) == c | |
assert splitted('x') == ['x'] | |
assert splitted(' ') == [' '] | |
assert splitted('\n') == ['\n'] | |
assert splitted('\r') == ['\r'] | |
# Few chars | |
assert splitted('x\n') == ['x\n'] | |
assert splitted('x\r') == ['x\r'] | |
assert splitted('x\r\n') == ['x\r\n'] | |
assert splitted('x\n ') == ['x\n', ' '] | |
assert splitted('x\r ') == ['x\r', ' '] | |
assert splitted('x\r\n ') == ['x\r\n', ' '] | |
# Special cases | |
assert splitted('\n\n\n') == ['\n', '\n', '\n'] | |
assert splitted('\n\r\r\n') == ['\n', '\r', '\r\n'] | |
assert splitted(' \n ') == [' \n', ' '] | |
def test_readlines_reserved_random(): | |
alphabet = 'abcdefghijklmnopqrstuvwxyz €Ř' | |
def randomtest(maxlines, maxchars): | |
reflines = [] | |
for i in range(random.randint(1, maxlines)): | |
line = ''.join(random.choice(alphabet) for j in range(0, maxchars)) | |
line += random.choice(('\r', '\n', '\r\n')) | |
reflines.append(line) | |
f = io.BytesIO(''.join(reflines).encode()) | |
reversed_lines = list(line.decode() for line in readlines_reversed(f)) | |
testlines = list(reversed(reversed_lines)) | |
assert len(reflines) == len(testlines) | |
for i in range(len(reflines)): | |
assert reflines[i] == testlines[i] | |
# You want avg number of chars to be >> 4096 (buffersize) | |
for iter in range(10): | |
randomtest(25000, 4) # real short lines | |
for iter in range(10): | |
randomtest(1250, 80) # pretty normal lines | |
for iter in range(10): | |
randomtest(5, 20000) # real long lines | |
if __name__ == '__main__': | |
test_readlines_reserved() | |
test_readlines_reserved_random() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment