Skip to content

Instantly share code, notes, and snippets.

@davisp
Created May 21, 2010 20:18
Show Gist options
  • Save davisp/409366 to your computer and use it in GitHub Desktop.
Save davisp/409366 to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
# Based on the Notifier example from tutorial
# See: http://trac.dbzteam.org/pyinotify/wiki/Tutorial
import optparse as op
import Queue
import subprocess as sp
import time
import pyinotify
__usage__ = "%prog [OPTIONS] DIR1 [DIR2 ...]"
class EventHandler(pyinotify.ProcessEvent):
def __init__(self, q):
self.q = q
def process_default(self, event):
if event.name.startswith("."):
return
self.q.put(event.pathname)
def run(opts, args, wm, notices):
mask = pyinotify.IN_DELETE | pyinotify.IN_CREATE | pyinotify.IN_MODIFY
for a in args:
wm.add_watch(a, mask, rec=True)
when = None
while True:
try:
path = notices.get(timeout=1)
except Queue.Empty:
path = None
if path is not None:
print "=> %s" % path
when = time.time()
if when and time.time() - when > opts.wait:
print "Executing..."
sp.check_call(opts.command, shell=True)
when = None
def options():
return [
op.make_option("-c", dest="command", default=None,
help = "A command to execute."),
op.make_option("-w", dest="wait", default=1, type="int",
help = "Number of seconds to wait before executing."),
]
def main():
parser = op.OptionParser(usage=__usage__, option_list=options())
opts, args = parser.parse_args()
if not len(args):
args = ["."]
notices = Queue.Queue()
wm = pyinotify.WatchManager()
handler = EventHandler(notices)
notifier = pyinotify.ThreadedNotifier(wm, handler)
notifier.start()
try:
run(opts, args, wm, notices)
except KeyboardInterrupt:
pass
finally:
notifier.stop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment