Skip to content

Instantly share code, notes, and snippets.

@SubOptimal
Created March 2, 2024 20:51
Show Gist options
  • Save SubOptimal/2e847050d30848864f6a96c02c77f767 to your computer and use it in GitHub Desktop.
Save SubOptimal/2e847050d30848864f6a96c02c77f767 to your computer and use it in GitHub Desktop.
Read last 5 lines from a file of any length
"""
Solution for the challenge https://mastodon.social/@willmcgugan/112025856092773557
How to execute:
1. the very first run of the script generates the example file with
10 million lines, of random lengths
2. drop file system caches [1]
echo 3 | sudo tee /proc/sys/vm/drop_caches
3. run the script to get the time it takes to read the last five lines from
the uncached file
4. run the script again to get the time for the cached read
example output:
$ python3 last_five.py
[+] generating example.txt
$ echo 3 | sudo tee /proc/sys/vm/drop_caches
$ python3 last_five.py
[+] timeit: 0m 0.000511s
09999996 xxxx [truncated line]
09999997 xxxx [truncated line]
09999998 xxxx [truncated line]
09999999 xxxx [truncated line]
10000000 xxxx [truncated line]
$ python3 last_five.py
[+] timeit: 0m 0.000058s
09999996 xxxx [truncated line]
09999997 xxxx [truncated line]
09999998 xxxx [truncated line]
09999999 xxxx [truncated line]
10000000 xxxx [truncated line]
[1] https://www.kernel.org/doc/html/latest/admin-guide/sysctl/vm.html?highlight=drop_caches
"""
import os
import random
import timeit
EXAMPLE_FILE_NAME = "example.txt"
BLOCK_SIZE = 8192
def tail_five(file_path):
tail_lines = []
linefeed_ord = ord(b"\n")
with open(file_path, "br") as file:
size = os.lseek(file.fileno(), 0, os.SEEK_END)
tail_pos = size
is_tail_block = True
linefeed_count = 0
linefeed_needed = 5
while tail_pos > 0 and linefeed_count < linefeed_needed:
if tail_pos > BLOCK_SIZE:
tail_pos -= BLOCK_SIZE
file.seek(tail_pos)
block = file.read(BLOCK_SIZE)
else:
file.seek(0)
block = file.read(tail_pos)
tail_pos = 0
if is_tail_block and (block[-1] == linefeed_ord):
# we need to read one linefeed more if the file ends with a
# linefeed
linefeed_needed = 6
is_tail_block = False
linefeed_count += block.count(b"\n")
# adjust the offset if more than 5 linefeed bytes where found in total
offset = 0
while linefeed_count >= linefeed_needed:
newline_pos = block.find(b"\n", offset)
offset = newline_pos + 1
linefeed_count -= 1
# set the file pointer to that position
file.seek(tail_pos + offset)
# read all lines starting from that position
while True:
_ = file.readline()
if not _:
break
tail_lines.append(_.strip().decode("utf-8"))
return tail_lines
def generate_example_file():
print(f"[+] generating {EXAMPLE_FILE_NAME}")
with open(EXAMPLE_FILE_NAME, "w") as file:
lines = 10_000_000
digits = len(str(lines))
for i in range(1, lines + 1):
length = random.randint(10, 1000)
file.write(f"{i:0{digits}d} {'x' * length}\n")
lines = []
def wrapper():
global lines
lines = tail_five(EXAMPLE_FILE_NAME)
if __name__ == "__main__":
if not os.path.exists(EXAMPLE_FILE_NAME):
generate_example_file()
else:
t = timeit.timeit(wrapper, globals=globals(), number=1)
minutes, seconds = divmod(t, 60)
print(f"[+] timeit: {int(minutes)}m {seconds:.6f}s")
for line in lines:
print(line)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment