-
-
Save amitsaha/5990310 to your computer and use it in GitHub Desktop.
''' | |
Basic tail command implementation | |
Usage: | |
tail.py filename numlines | |
''' | |
import sys | |
import linecache | |
if len(sys.argv) !=3: | |
print 'Usage: tail.py <file> <nlines>' | |
sys.exit(1) | |
# filename and number of lines requested | |
fname, nlines = sys.argv[1:] | |
nlines = int(nlines) | |
# count the total number of lines | |
tot_lines = len(open(fname).readlines()) | |
# use line cache module to read the lines | |
for i in range(tot_lines - nlines + 1, tot_lines+1): | |
print linecache.getline(sys.argv[1],i), |
""" This is a more efficient version, since it does not read the entire | |
file | |
""" | |
import sys | |
import os | |
bufsize = 8192 | |
lines = int(sys.argv[1]) | |
fname = sys.argv[2] | |
fsize = os.stat(fname).st_size | |
iter = 0 | |
with open(sys.argv[2]) as f: | |
if bufsize > fsize: | |
bufsize = fsize-1 | |
data = [] | |
while True: | |
iter +=1 | |
f.seek(fsize-bufsize*iter) | |
data.extend(f.readlines()) | |
if len(data) >= lines or f.tell() == 0: | |
print(''.join(data[-lines:])) | |
break |
Implementation of tail -n k
. This uses offset and doesn't read the whole line. Imagine the line is 10GB large...
def tail(filename, n):
stat = os.stat(filename)
if stat.st_size == 0 or n == 0:
yield ''
return
page_size = 5
offsets = []
count = _n = n if n >= 0 else -n
last_byte_read = last_nl_byte = starting_offset = stat.st_size - 1
with open(filename, 'r') as f:
while count > 0:
starting_byte = last_byte_read - page_size
if last_byte_read == 0:
offsets.append(0)
break
elif starting_byte < 0:
f.seek(0)
text = f.read(last_byte_read)
else:
f.seek(starting_byte)
text = f.read(page_size)
for i in range(-1, -1*len(text)-1, -1):
last_byte_read -= 1
if text[i] == '\n':
last_nl_byte = last_byte_read
starting_offset = last_nl_byte + 1
offsets.append(starting_offset)
count -= 1
offsets = offsets[len(offsets)-_n:]
offsets.reverse()
with open(filename, 'r') as f:
for i, offset in enumerate(offsets):
f.seek(offset)
if i == len(offsets) - 1:
yield f.read()
else:
bytes_to_read = offsets[i+1] - offset
yield f.read(bytes_to_read)
filename = '/tmp/test.txt'
for x in tail(filename, 10):
print(x.strip())
@amit, There is a small flaw here. If bufsizeiter returns pointer on the first line in the N lines to be printed, then there is a possibility that first item in data[-lines:] can start from anywhere between first and last line of that line ?
Say if line is " John Johny yes pappa" , data[-lines:][0] can be "ny yes pappa" since f.seek(fsize-bufsizeiter) can return anywhere ?
Changing if len(data) >= lines or f.tell() == 0: to if len(data) > lines or f.tell() == 0: Should fix the issue
Python 3.8 asyncio version,
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# >>
# Blake VandeMerwe, LiveViewTech
# <<
import os
import io
import asyncio
from functools import partial
from typing import AsyncIterator
LINE_BUFFER = 1
async def tail(
filename: str,
last_lines: int = 10,
non_exist_max_secs: float = 30.0,
fp_poll_secs: float = 0.125
) -> AsyncIterator[str]:
"""Continuously tail a file pointer yielding one line at a time."""
async def wait_exists() -> bool:
"""Wait for a file to exist, the return statement reflects
whether or not the file existed when the timeout limits were reached."""
bail_at: float = time.monotonic() + non_exist_max_secs
while not os.path.exists(filename):
if time.monotonic() >= bail_at:
return False
await asyncio.sleep(fp_poll_secs)
return True
async def check_rotate(_fp) -> io.TextIOBase:
"""Determine if the file rotated in place; same name different inode."""
nonlocal fino
if os.stat(filename).st_ino != fino:
new_fp = open(filename, 'r')
_fp.close()
new_fp.seek(0, os.SEEK_SET)
fino = os.fstat(new_fp.fileno()).st_ino
return new_fp
return _fp
# ~~
if not await wait_exists():
return
buff = io.StringIO()
stat = os.stat(filename)
fino: int = stat.st_ino
size: int = stat.st_size
blocksize: int = os.statvfs(filename).f_bsize
fp = open(filename, 'r', LINE_BUFFER)
if last_lines > 0:
if stat.st_size <= blocksize:
# if the file is smaller than 8kb, read all the lines
for line in fp.readlines()[-last_lines::]:
yield line.rstrip()
else:
# if the file is larger than 8kb, seek 8kb from the end
# and return all the lines except the (potential) half-line
# first element and the null-terminated extra line at the end.
fp.seek(os.stat(fp.fileno()).st_size - blocksize)
for line in fp.readlines()[1:-1][-last_lines::]:
yield line.rstrip()
# seek to the end of the file for tailing
# given the above operations we should already be there.
fp.seek(0, os.SEEK_END)
try:
while True:
# wait for the file to exist -- generously
if not os.path.exists(filename):
if not await wait_exists():
return
fp = await check_rotate(fp)
n_stat = os.fstat(fp.fileno())
n_size = n_stat.st_size
# if the file is the same size, churn
# .. this could be error-prone on small files that
# rotate VERY fast, but that's an edge case for
# tailing a persistent log file.
if n_size == size:
await asyncio.sleep(fp_poll_secs)
continue
# if the file shrank, seek to the beginning
if n_size < size:
fp.seek(0, os.SEEK_SET)
size = n_size
for chunk in iter(partial(fp.read, blocksize), ''):
buff.write(chunk)
buff.seek(0, os.SEEK_SET)
for line in buff.readlines():
yield line.rstrip()
# resize our string buffer
buff.truncate(0)
except IOError:
buff.close()
fp.close()
if __name__ == '__main__':
async def main():
async for line in tail(r'/etc/foldingathome/log.txt'):
print(line)
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except KeyboardInterrupt:
pass
loop.stop()
loop.close()
This is great, but you don't need the # -*- coding: utf-8 -*-
line in Python 3
Python 3.8 asyncio version,
#! /usr/bin/env python # -*- coding: utf-8 -*- ...
@therumbler TIL, thanks! I've been using the same "new file" template for years. Time to update!
Second implementation breaks for empty files, causing infinite looping. Solved by putting
if fsize < 1: return ''
before opening file