Created
March 16, 2019 06:53
-
-
Save MU-Software/cc7df56514a87ee76538c972aef1fe54 to your computer and use it in GitHub Desktop.
Python script auto reloader (directory watcher)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import sys | |
| import time | |
| import hashlib | |
| import pathlib as pt | |
| import subprocess as sp | |
| from watchdog.observers import Observer | |
| from watchdog.events import PatternMatchingEventHandler | |
| modified_record = {} | |
| cur_sp_app = None | |
| execute_cmd = [ | |
| sys.executable, pt.Path('app.py').as_posix()] | |
| def hash_bytestr_iter(bytesiter, hasher, ashexstr=False): | |
| for block in bytesiter: | |
| hasher.update(block) | |
| return hasher.hexdigest() if ashexstr else hasher.digest() | |
| def file_as_blockiter(afile, blocksize=65536): | |
| with afile: | |
| block = afile.read(blocksize) | |
| while len(block) > 0: | |
| yield block | |
| block = afile.read(blocksize) | |
| def md5_file(fname): | |
| hash_md5 = hashlib.md5() | |
| with open(fname, "rb") as f: | |
| for chunk in iter(lambda: f.read(4096), b""): | |
| hash_md5.update(chunk) | |
| return hash_md5.hexdigest() | |
| class DirectoryWatcher(PatternMatchingEventHandler): | |
| patterns=['*.py'] | |
| def process(self, event): | |
| """ | |
| event.event_type | |
| 'modified' | 'created' | 'moved' | 'deleted' | |
| event.is_directory | |
| True | False | |
| event.src_path | |
| path/to/observed/file | |
| """ | |
| if event.event_type == 'deleted': return | |
| global cur_sp_app, modified_record | |
| evt_pt = event.src_path | |
| evt_pt_md5 = md5_file(pt.Path(evt_pt).as_posix()) | |
| evt_pt_mtime = pt.Path(evt_pt).stat().st_mtime | |
| if not modified_record.get(evt_pt, False): | |
| modified_record[evt_pt] = { | |
| 'md5' : evt_pt_md5, | |
| 'mtime' : evt_pt_mtime | |
| } | |
| elif modified_record[evt_pt]['md5'] == evt_pt_md5: | |
| modified_record[evt_pt]['mtime'] = evt_pt_mtime | |
| return | |
| elif evt_pt_mtime - modified_record[evt_pt]['mtime'] < 0.005: | |
| modified_record[evt_pt]['md5'] = evt_pt_md5 | |
| modified_record[evt_pt]['mtime'] = evt_pt_mtime | |
| return | |
| modified_record[evt_pt] = { | |
| 'md5' : evt_pt_md5, | |
| 'mtime' : evt_pt_mtime | |
| } | |
| print(f'{event.src_path} {event.event_type}') | |
| if cur_sp_app != None: | |
| cur_sp_app.kill() | |
| while cur_sp_app.poll() is not None: | |
| print('Killing script now...(retrying)') | |
| cur_sp_app.kill() | |
| print('Script killed, restarting...') | |
| cur_sp_app = None | |
| cur_sp_app = sp.Popen(execute_cmd, close_fds=False) | |
| print('Script restarted!') | |
| on_created = lambda self, event: self.process(event) | |
| on_modified = lambda self, event: self.process(event) | |
| on_moved = lambda self, event: self.process(event) | |
| on_deleted = lambda self, event: self.process(event) | |
| if __name__ == '__main__': | |
| os.system('cls') | |
| if sys.argv: execute_cmd += sys.argv | |
| cur_sp_app = sp.Popen(execute_cmd, close_fds=False) | |
| observer = Observer() | |
| observer.schedule(DirectoryWatcher(), path='.', recursive=False) | |
| observer.start() | |
| try: | |
| while True: | |
| time.sleep(0.5) | |
| except KeyboardInterrupt: | |
| print('autoreload : KeyboardInterrupt') | |
| if cur_sp_app.poll() is not None: | |
| cur_sp_app.kill() | |
| observer.stop() | |
| observer.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment