Created
December 16, 2017 21:30
-
-
Save icedraco/cf2694d7237bd33e562a11fb530ae356 to your computer and use it in GitHub Desktop.
Watch a text file for new content (a-la "tail -f"); reload file if it seems to have been truncated
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import sleep | |
def watch_fp(fp, delim: str = "\n"): | |
buf = [] | |
while True: | |
raw_input = fp.read() | |
if raw_input != "": | |
buf.append(raw_input) | |
if delim in raw_input: | |
units = ''.join(buf).split(delim) | |
buf = [units.pop()] # unterminated part | |
yield from units | |
else: | |
yield None # no new data | |
def watch(filename: str): | |
while True: | |
with open(filename, 'r') as fp: | |
fetch = watch_fp(fp) | |
is_file_reset = False | |
while not is_file_reset: | |
# fetch and print new data (if any) | |
next_line = next(fetch) | |
while next_line is not None: | |
print(next_line) | |
next_line = next(fetch) | |
# check if file was reset | |
position = fp.tell() | |
fp.seek(0, 2) | |
new_position = fp.tell() | |
is_file_reset = new_position < position | |
# nothing new | |
if not is_file_reset: | |
sleep(1) | |
print("--- RELOADING FILE ---") | |
def main(): | |
args = parse_args() | |
watch(args.filename) | |
def parse_args(): | |
import argparse | |
parser = argparse.ArgumentParser('filewatch', description='file change tracker') | |
parser.add_argument('filename', type=str, help="file to watch") | |
args = parser.parse_args() | |
return args | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment