Skip to content

Instantly share code, notes, and snippets.

@samuelcolvin
Created May 10, 2016 18:53
Show Gist options
  • Save samuelcolvin/abe2ae10120197141434836c3f61c564 to your computer and use it in GitHub Desktop.
Save samuelcolvin/abe2ae10120197141434836c3f61c564 to your computer and use it in GitHub Desktop.
import os
import signal
from fnmatch import fnmatch
from multiprocessing import Process
from pathlib import Path
from datetime import datetime
import pyinotify as ino
# specific to jetbrains I think, very annoying if not ignored
PY_FILE = '*.py'
def serve():
from app.run import app
loop = app.loop
handler = app.make_handler(access_log_format='[%t] %r %s %b')
srv = app.loop.run_until_complete(loop.create_server(handler, '0.0.0.0', 8001))
try:
loop.run_forever()
except KeyboardInterrupt: # pragma: no branch
pass
finally:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.shutdown())
loop.run_until_complete(handler.finish_connections(4))
loop.run_until_complete(app.cleanup())
loop.close()
class EventHandler:
_process = None
def __init__(self):
self._build_time = datetime.now()
self._start_process()
def __call__(self, event):
if not fnmatch(event.name, PY_FILE):
return
n = datetime.now()
since_build = (n - self._build_time).total_seconds()
if since_build <= 1:
return
self._build_time = n
print('restarting server')
self.stop_process()
self._start_process()
def _start_process(self):
self._process = Process(target=serve)
self._process.start()
def stop_process(self):
if self._process.is_alive():
os.kill(self._process.pid, signal.SIGINT)
self._process.join(5)
else:
print('process already dead %s' % self._process)
wm = ino.WatchManager()
event_handler = EventHandler()
notifier = ino.Notifier(wm, event_handler)
WATCH_DIR = (Path(__file__).parent / 'app').absolute()
events = ino.IN_MODIFY | ino.IN_ATTRIB | ino.IN_CLOSE_WRITE | ino.IN_DELETE | ino.IN_MOVED_TO
wm.add_watch(WATCH_DIR.path, events, rec=True)
try:
notifier.loop()
except KeyboardInterrupt:
pass
finally:
event_handler.stop_process()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment