Created
August 27, 2022 14:13
-
-
Save ChenyangGao/6b34f43f2cfca03e5e1cec7344c53c98 to your computer and use it in GitHub Desktop.
Python实现tail命令(功能暂时比较简陋)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# coding: utf-8 | |
__author__ = "ChenyangGao <https://chenyanggao.github.io/>" | |
__version__ = (0, 0, 1) | |
__all__ = ["tail"] | |
from sys import stdout | |
from time import sleep | |
CHUNK_SIZE = 1024 | |
def tail(path, nlines=10, sleep_interval=1.0, file=stdout, encoding=None): | |
"""Print the last lines of a file. | |
:param path: The path of the file to be opened or an integer file descriptor of the file to be wrapped. | |
:param nlines: Output the last `nlines` (default 10) lines. | |
:param sleep_interval: Sleep for approximately `sleep_interval` seconds (default 1.0) between iterations. | |
:param file: The file object to be written into. | |
:param encoding: `encoding` is the name of the encoding used to decode or encode the file. | |
""" | |
nlines += 1 | |
write, flush = file.write, file.flush | |
with open(path, encoding=encoding) as f: | |
encoding = f.encoding | |
readline = f.readline | |
file_buffer = f.buffer | |
last_pos = f.seek(0, 2) | |
if last_pos: | |
last_lines = [b""] | |
i = last_pos | |
while i >= CHUNK_SIZE: | |
i = file_buffer.seek(-CHUNK_SIZE, 1) | |
ls = file_buffer.read(CHUNK_SIZE).split(b"\n") | |
last_lines[-1] = ls[-1] + last_lines[-1] | |
last_lines.extend(reversed(ls[:-1])) | |
if len(last_lines) > nlines: | |
break | |
file_buffer.seek(-CHUNK_SIZE, 1) | |
else: | |
file_buffer.seek(0, 0) | |
ls = file_buffer.read(i).split(b"\n") | |
last_lines[-1] = ls[-1] + last_lines[-1] | |
last_lines.extend(reversed(ls[:-1])) | |
write(b"\n".join(reversed(last_lines[:nlines])).decode(encoding)) | |
flush() | |
f.seek(last_pos, 0) | |
while True: | |
line = readline() | |
if not line: | |
sleep(sleep_interval) | |
write(line) | |
flush() | |
if __name__ == "__main__": | |
from argparse import ArgumentParser | |
parser = ArgumentParser(description="Print the last lines of a file") | |
parser.add_argument("path", help="The path of the file to be opened") | |
parser.add_argument("-n", "--nlines", metavar="NUM", default=10, type=int, help="Output the last NUM (default 10) lines") | |
parser.add_argument("-s", "--sleep_interval", metavar="N", default=1.0, type=float, help="Sleep for approximately N seconds (default 1.0) between iterations") | |
parser.add_argument("-e", "--encoding", help="The name of the encoding used to decode or encode the file.") | |
args = parser.parse_args() | |
try: | |
tail(args.path, nlines=args.nlines, sleep_interval=args.sleep_interval, encoding=args.encoding) | |
except KeyboardInterrupt: | |
pass | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment