Last active
August 9, 2024 18:42
-
-
Save amitsaha/5990310 to your computer and use it in GitHub Desktop.
Simple implementation of the tail command in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Basic tail command implementation | |
Usage: | |
tail.py filename numlines | |
''' | |
import sys | |
import linecache | |
if len(sys.argv) !=3: | |
print 'Usage: tail.py <file> <nlines>' | |
sys.exit(1) | |
# filename and number of lines requested | |
fname, nlines = sys.argv[1:] | |
nlines = int(nlines) | |
# count the total number of lines | |
tot_lines = len(open(fname).readlines()) | |
# use line cache module to read the lines | |
for i in range(tot_lines - nlines + 1, tot_lines+1): | |
print linecache.getline(sys.argv[1],i), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" This is a more efficient version, since it does not read the entire | |
file | |
""" | |
import sys | |
import os | |
bufsize = 8192 | |
lines = int(sys.argv[1]) | |
fname = sys.argv[2] | |
fsize = os.stat(fname).st_size | |
iter = 0 | |
with open(sys.argv[2]) as f: | |
if bufsize > fsize: | |
bufsize = fsize-1 | |
data = [] | |
while True: | |
iter +=1 | |
f.seek(fsize-bufsize*iter) | |
data.extend(f.readlines()) | |
if len(data) >= lines or f.tell() == 0: | |
print(''.join(data[-lines:])) | |
break |
@amit, There is a small flaw here. If bufsizeiter returns pointer on the first line in the N lines to be printed, then there is a possibility that first item in data[-lines:] can start from anywhere between first and last line of that line ?
Say if line is " John Johny yes pappa" , data[-lines:][0] can be "ny yes pappa" since f.seek(fsize-bufsizeiter) can return anywhere ?
Changing if len(data) >= lines or f.tell() == 0: to if len(data) > lines or f.tell() == 0: Should fix the issue
Python 3.8 asyncio version,
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# >>
# Blake VandeMerwe, LiveViewTech
# <<
import os
import io
import asyncio
from functools import partial
from typing import AsyncIterator
LINE_BUFFER = 1
async def tail(
filename: str,
last_lines: int = 10,
non_exist_max_secs: float = 30.0,
fp_poll_secs: float = 0.125
) -> AsyncIterator[str]:
"""Continuously tail a file pointer yielding one line at a time."""
async def wait_exists() -> bool:
"""Wait for a file to exist, the return statement reflects
whether or not the file existed when the timeout limits were reached."""
bail_at: float = time.monotonic() + non_exist_max_secs
while not os.path.exists(filename):
if time.monotonic() >= bail_at:
return False
await asyncio.sleep(fp_poll_secs)
return True
async def check_rotate(_fp) -> io.TextIOBase:
"""Determine if the file rotated in place; same name different inode."""
nonlocal fino
if os.stat(filename).st_ino != fino:
new_fp = open(filename, 'r')
_fp.close()
new_fp.seek(0, os.SEEK_SET)
fino = os.fstat(new_fp.fileno()).st_ino
return new_fp
return _fp
# ~~
if not await wait_exists():
return
buff = io.StringIO()
stat = os.stat(filename)
fino: int = stat.st_ino
size: int = stat.st_size
blocksize: int = os.statvfs(filename).f_bsize
fp = open(filename, 'r', LINE_BUFFER)
if last_lines > 0:
if stat.st_size <= blocksize:
# if the file is smaller than 8kb, read all the lines
for line in fp.readlines()[-last_lines::]:
yield line.rstrip()
else:
# if the file is larger than 8kb, seek 8kb from the end
# and return all the lines except the (potential) half-line
# first element and the null-terminated extra line at the end.
fp.seek(os.stat(fp.fileno()).st_size - blocksize)
for line in fp.readlines()[1:-1][-last_lines::]:
yield line.rstrip()
# seek to the end of the file for tailing
# given the above operations we should already be there.
fp.seek(0, os.SEEK_END)
try:
while True:
# wait for the file to exist -- generously
if not os.path.exists(filename):
if not await wait_exists():
return
fp = await check_rotate(fp)
n_stat = os.fstat(fp.fileno())
n_size = n_stat.st_size
# if the file is the same size, churn
# .. this could be error-prone on small files that
# rotate VERY fast, but that's an edge case for
# tailing a persistent log file.
if n_size == size:
await asyncio.sleep(fp_poll_secs)
continue
# if the file shrank, seek to the beginning
if n_size < size:
fp.seek(0, os.SEEK_SET)
size = n_size
for chunk in iter(partial(fp.read, blocksize), ''):
buff.write(chunk)
buff.seek(0, os.SEEK_SET)
for line in buff.readlines():
yield line.rstrip()
# resize our string buffer
buff.truncate(0)
except IOError:
buff.close()
fp.close()
if __name__ == '__main__':
async def main():
async for line in tail(r'/etc/foldingathome/log.txt'):
print(line)
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except KeyboardInterrupt:
pass
loop.stop()
loop.close()
This is great, but you don't need the # -*- coding: utf-8 -*-
line in Python 3
Python 3.8 asyncio version,
#! /usr/bin/env python # -*- coding: utf-8 -*- ...
@therumbler TIL, thanks! I've been using the same "new file" template for years. Time to update!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Implementation of
tail -n k
. This uses offset and doesn't read the whole line. Imagine the line is 10GB large...