Skip to content

Instantly share code, notes, and snippets.

@Deepwalker
Created April 23, 2015 06:58
Show Gist options
  • Save Deepwalker/8f563a4c66b875c8f1b5 to your computer and use it in GitHub Desktop.
Save Deepwalker/8f563a4c66b875c8f1b5 to your computer and use it in GitHub Desktop.
import sys
import time
import threading
import queue
from subprocess import Popen
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
EVENTS = ['modified', 'deleted', 'created', 'moved']
class Watcher:
lock = threading.Lock()
cmd = None
@classmethod
def dispatch(cls, event):
print(event, event.event_type, event.src_path)
# print(dir(event))
if event.event_type in EVENTS:
fle = str(event.dest_path if event.event_type == 'moved' else event.src_path)
if not fle.endswith(('py', 'bs')):
return
with cls.lock:
cls.cmd = 'restart'
def manage_process(args):
process = Popen(args)
cmd = None
while True:
with Watcher.lock:
cmd = Watcher.cmd
if cmd is None:
time.sleep(1)
continue
if cmd in ('restart', 'terminate'):
process.terminate()
process.wait()
code = process.wait()
print('Process terminated with %s' % code)
if cmd == 'terminate':
break
process = Popen(args)
with Watcher.lock:
Watcher.cmd = None
time.sleep(1)
if __name__ == "__main__":
cmd = sys.argv[2:]
if not cmd:
print('Must supply path and cmd')
exit(0)
path = sys.argv[1] if len(sys.argv) > 1 else '.'
observer = Observer()
print(observer)
observer.schedule(Watcher, path, recursive=True)
observer.start()
thread = threading.Thread(target=lambda: manage_process(cmd))
thread.run()
try:
while True:
print('sleep')
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
with Watcher.lock:
Watcher.cmd = 'terminate'
observer.join()
thread.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment