Created
May 10, 2016 18:53
-
-
Save samuelcolvin/abe2ae10120197141434836c3f61c564 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import signal | |
from fnmatch import fnmatch | |
from multiprocessing import Process | |
from pathlib import Path | |
from datetime import datetime | |
import pyinotify as ino | |
# specific to jetbrains I think, very annoying if not ignored | |
PY_FILE = '*.py' | |
def serve(): | |
from app.run import app | |
loop = app.loop | |
handler = app.make_handler(access_log_format='[%t] %r %s %b') | |
srv = app.loop.run_until_complete(loop.create_server(handler, '0.0.0.0', 8001)) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: # pragma: no branch | |
pass | |
finally: | |
srv.close() | |
loop.run_until_complete(srv.wait_closed()) | |
loop.run_until_complete(app.shutdown()) | |
loop.run_until_complete(handler.finish_connections(4)) | |
loop.run_until_complete(app.cleanup()) | |
loop.close() | |
class EventHandler: | |
_process = None | |
def __init__(self): | |
self._build_time = datetime.now() | |
self._start_process() | |
def __call__(self, event): | |
if not fnmatch(event.name, PY_FILE): | |
return | |
n = datetime.now() | |
since_build = (n - self._build_time).total_seconds() | |
if since_build <= 1: | |
return | |
self._build_time = n | |
print('restarting server') | |
self.stop_process() | |
self._start_process() | |
def _start_process(self): | |
self._process = Process(target=serve) | |
self._process.start() | |
def stop_process(self): | |
if self._process.is_alive(): | |
os.kill(self._process.pid, signal.SIGINT) | |
self._process.join(5) | |
else: | |
print('process already dead %s' % self._process) | |
wm = ino.WatchManager() | |
event_handler = EventHandler() | |
notifier = ino.Notifier(wm, event_handler) | |
WATCH_DIR = (Path(__file__).parent / 'app').absolute() | |
events = ino.IN_MODIFY | ino.IN_ATTRIB | ino.IN_CLOSE_WRITE | ino.IN_DELETE | ino.IN_MOVED_TO | |
wm.add_watch(WATCH_DIR.path, events, rec=True) | |
try: | |
notifier.loop() | |
except KeyboardInterrupt: | |
pass | |
finally: | |
event_handler.stop_process() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment