Created
March 2, 2024 20:51
-
-
Save SubOptimal/2e847050d30848864f6a96c02c77f767 to your computer and use it in GitHub Desktop.
Read last 5 lines from a file of any length
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Solution for the challenge https://mastodon.social/@willmcgugan/112025856092773557 | |
How to execute: | |
1. the very first run of the script generates the example file with | |
10 million lines, of random lengths | |
2. drop file system caches [1] | |
echo 3 | sudo tee /proc/sys/vm/drop_caches | |
3. run the script to get the time it takes to read the last five lines from | |
the uncached file | |
4. run the script again to get the time for the cached read | |
example output: | |
$ python3 last_five.py | |
[+] generating example.txt | |
$ echo 3 | sudo tee /proc/sys/vm/drop_caches | |
$ python3 last_five.py | |
[+] timeit: 0m 0.000511s | |
09999996 xxxx [truncated line] | |
09999997 xxxx [truncated line] | |
09999998 xxxx [truncated line] | |
09999999 xxxx [truncated line] | |
10000000 xxxx [truncated line] | |
$ python3 last_five.py | |
[+] timeit: 0m 0.000058s | |
09999996 xxxx [truncated line] | |
09999997 xxxx [truncated line] | |
09999998 xxxx [truncated line] | |
09999999 xxxx [truncated line] | |
10000000 xxxx [truncated line] | |
[1] https://www.kernel.org/doc/html/latest/admin-guide/sysctl/vm.html?highlight=drop_caches | |
""" | |
import os | |
import random | |
import timeit | |
EXAMPLE_FILE_NAME = "example.txt" | |
BLOCK_SIZE = 8192 | |
def tail_five(file_path): | |
tail_lines = [] | |
linefeed_ord = ord(b"\n") | |
with open(file_path, "br") as file: | |
size = os.lseek(file.fileno(), 0, os.SEEK_END) | |
tail_pos = size | |
is_tail_block = True | |
linefeed_count = 0 | |
linefeed_needed = 5 | |
while tail_pos > 0 and linefeed_count < linefeed_needed: | |
if tail_pos > BLOCK_SIZE: | |
tail_pos -= BLOCK_SIZE | |
file.seek(tail_pos) | |
block = file.read(BLOCK_SIZE) | |
else: | |
file.seek(0) | |
block = file.read(tail_pos) | |
tail_pos = 0 | |
if is_tail_block and (block[-1] == linefeed_ord): | |
# we need to read one linefeed more if the file ends with a | |
# linefeed | |
linefeed_needed = 6 | |
is_tail_block = False | |
linefeed_count += block.count(b"\n") | |
# adjust the offset if more than 5 linefeed bytes where found in total | |
offset = 0 | |
while linefeed_count >= linefeed_needed: | |
newline_pos = block.find(b"\n", offset) | |
offset = newline_pos + 1 | |
linefeed_count -= 1 | |
# set the file pointer to that position | |
file.seek(tail_pos + offset) | |
# read all lines starting from that position | |
while True: | |
_ = file.readline() | |
if not _: | |
break | |
tail_lines.append(_.strip().decode("utf-8")) | |
return tail_lines | |
def generate_example_file(): | |
print(f"[+] generating {EXAMPLE_FILE_NAME}") | |
with open(EXAMPLE_FILE_NAME, "w") as file: | |
lines = 10_000_000 | |
digits = len(str(lines)) | |
for i in range(1, lines + 1): | |
length = random.randint(10, 1000) | |
file.write(f"{i:0{digits}d} {'x' * length}\n") | |
lines = [] | |
def wrapper(): | |
global lines | |
lines = tail_five(EXAMPLE_FILE_NAME) | |
if __name__ == "__main__": | |
if not os.path.exists(EXAMPLE_FILE_NAME): | |
generate_example_file() | |
else: | |
t = timeit.timeit(wrapper, globals=globals(), number=1) | |
minutes, seconds = divmod(t, 60) | |
print(f"[+] timeit: {int(minutes)}m {seconds:.6f}s") | |
for line in lines: | |
print(line) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment