-
-
Save bagrow/825522 to your computer and use it in GitHub Desktop.
MONiter & EXecute
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
MONitor & EXecute | |
Yong-Yeol Ahn (http://yongyeol.com/) | |
This script monitors multiple files and executes the given command when | |
any of those files is changed. | |
Modified by Jim Bagrow, 2011-02-17 | |
""" | |
import sys, os, time, signal | |
from glob import glob | |
from optparse import OptionParser | |
from itertools import izip | |
import tty | |
from select import select | |
ssw = sys.stdout.write | |
ssf = sys.stdout.flush | |
def end_handler(signum, frame): | |
print "\n=== Terminating monex... Have a nice day! ===" | |
sys.exit(0) | |
class TerminalFile: | |
"""Adapted from http://code.activestate.com/recipes/203830/""" | |
def __init__(self,infile): | |
if not infile.isatty(): | |
raise Exception() | |
self.file = infile | |
# prepare for getch: | |
self.save_attr=tty.tcgetattr(self.file) | |
newattr=self.save_attr[:] | |
newattr[3] &= ~tty.ECHO & ~tty.ICANON | |
tty.tcsetattr(self.file, tty.TCSANOW, newattr) | |
def __del__(self): | |
# restoring stdin: | |
import tty # required this import here | |
tty.tcsetattr(self.file, tty.TCSADRAIN, self.save_attr) | |
def getch(self): | |
if select([self.file],[],[],0)[0]: | |
return self.file.read(1) | |
return "" | |
def do(command): | |
ssw( "[%s] Running `%s'..." % (time.strftime('%X %Y-%m-%d'),command) ); ssf() | |
os.system("%s" % (command)) | |
ssw( ' done\n' ); ssf() | |
def get_filelist(args): | |
files = set() | |
for arg in args: | |
files |= set(glob(arg)) | |
return sorted(files) | |
def parse_command(cmd,files): | |
"""check if special % mode is in use""" | |
if len(files) == 1 and cmd.strip()[-1] == "%": | |
cmd = cmd.replace("%", files[0]) | |
return cmd | |
def pretty_files_str(files): | |
files_str = ", ".join(files) | |
if len(files_str) > 75: | |
return "%s [...] %s" % (files[0],files[-1]) | |
return files_str | |
def clear_line(): | |
ssw( "\r" + " "*100 + "\r" ); ssf() | |
def print_paused(paused, files_str): | |
if paused: | |
ssw( "=== Paused ===" ); ssf() | |
else: | |
ssw( "=== Watching %s ===" % files_str ); ssf() | |
if __name__ == '__main__': | |
usage = 'usage: %prog -c "command" file1 file2 ...' | |
parser = OptionParser(usage=usage) | |
parser.add_option("-c", "--command", dest='command', help="command to be executed") | |
(options, args) = parser.parse_args() | |
if not options.command: | |
parser.error('-c option is needed.') | |
sleep_time = 0.2 | |
signal.signal(signal.SIGINT, end_handler) | |
print "MONEX: press p to pause/unpause, r/f to force the command, and q to quit..." | |
s = TerminalFile( sys.stdin ) | |
paused = False | |
files = get_filelist(args) | |
files_str = pretty_files_str(files) | |
command = parse_command( options.command.strip('"'), files ) | |
old_timestamps = [ -1 for x in files] | |
while True: | |
char = s.getch() | |
if char == 'q': | |
end_handler(0,0) | |
elif char == 'p': | |
clear_line() | |
paused = (not paused) | |
print_paused(paused, files_str) | |
elif char == 'r' or char == 'f': | |
clear_line() | |
do(command) | |
print_paused(paused, files_str) | |
if not paused: | |
curr_timestamps = [os.stat(x).st_mtime for x in files] | |
if any(curr != old for curr, old in izip(curr_timestamps, old_timestamps)): | |
clear_line() | |
do(command) | |
ssw( "=== Watching %s ===" % files_str ); ssf() | |
old_timestamps = curr_timestamps | |
time.sleep( sleep_time ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Now supports keypresses to pause/unpause/quit