Last active
October 12, 2015 19:38
-
-
Save cynici/4076879 to your computer and use it in GitHub Desktop.
Monitor directories and take action on new files, for Python 2.7+
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.pyc |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# Author: [email protected] | |
# Version: 20130307c | |
# Source: https://gist.github.com/cynici/4076879 | |
# Requires: Python 2.6+, pyinotify | |
# | |
import os, sys, datetime, re | |
from optparse import OptionParser, IndentedHelpFormatter, textwrap | |
import simplejson | |
import pyinotify | |
import logging | |
import shutil, subprocess, shlex, fnmatch | |
help_text = """Monitor directories in *local* filesystem for new files and do something. | |
Example configuration file in JSON format: | |
{ | |
"/data/EUMETSAT_Data_Channel_2": ["dir1", "script2 option"], | |
"/data/EUMETSAT_Data_Channel_10": "mod3", | |
"#/example/commented_entry": "this entry will be ignored.", | |
"/raid/pub/gsfcdata/npp/viirs/level2": { | |
"include": "FireLoc*.txt", | |
"action": "/usr/bin/ncftpput -u ftpuser -p password ftpserver ." | |
} | |
} | |
Eg1: "dir1" is a directory to which new files will be copied. | |
Eg2: "script2" is *absolute* pathname of an executable to run with new file as argument. | |
Eg3: "mod3" is user-defined Python module in dirmon_action/mod3.py in | |
which Mod3FileProcessor(dir,file,fullpath,logger,**kwargs) is defined. | |
"dirmon_action" directory must be located in same directory %prog | |
or one of PYTHONPATH and contains __init__.py | |
Eg4: Commented entry | |
Eg5: Filter new files and execute ncftpput. Other keywords are: | |
"exclude": opposite of "include" | |
"excludere": matching with regular expression, as is "includere" | |
"mask": sum of pyinotify.IN_* integer value | |
"recursive": directory to be watched recursively | |
"shell": run command in a shell | |
"wait": wait for command to complete execution""" | |
class IndentedHelpFormatterWithNL(IndentedHelpFormatter): | |
def format_description(self, description): | |
if not description: return "" | |
desc_width = self.width - self.current_indent | |
indent = " "*self.current_indent | |
# the above is still the same | |
bits = description.split('\n') | |
formatted_bits = [ | |
textwrap.fill(bit, | |
desc_width, | |
initial_indent=indent, | |
subsequent_indent=indent) | |
for bit in bits] | |
result = "\n".join(formatted_bits) + "\n" | |
return result | |
def format_option(self, option): | |
# The help for each option consists of two parts: | |
# * the opt strings and metavars | |
# eg. ("-x", or "-fFILENAME, --file=FILENAME") | |
# * the user-supplied help string | |
# eg. ("turn on expert mode", "read data from FILENAME") | |
# | |
# If possible, we write both of these on the same line: | |
# -x turn on expert mode | |
# | |
# But if the opt string list is too long, we put the help | |
# string on a second line, indented to the same column it would | |
# start in if it fit on the first line. | |
# -fFILENAME, --file=FILENAME | |
# read data from FILENAME | |
result = [] | |
opts = self.option_strings[option] | |
opt_width = self.help_position - self.current_indent - 2 | |
if len(opts) > opt_width: | |
opts = "%*s%s\n" % (self.current_indent, "", opts) | |
indent_first = self.help_position | |
else: # start help on same line as opts | |
opts = "%*s%-*s " % (self.current_indent, "", opt_width, opts) | |
indent_first = 0 | |
result.append(opts) | |
if option.help: | |
help_text = self.expand_default(option) | |
# Everything is the same up through here | |
help_lines = [] | |
for para in help_text.split("\n"): | |
help_lines.extend(textwrap.wrap(para, self.help_width)) | |
# Everything is the same after here | |
result.append("%*s%s\n" % ( | |
indent_first, "", help_lines[0])) | |
result.extend(["%*s%s\n" % (self.help_position, "", line) | |
for line in help_lines[1:]]) | |
elif opts[-1] != "\n": | |
result.append("\n") | |
return "".join(result) | |
class EventHandler(pyinotify.ProcessEvent): | |
def __init__(self, dispatchfunc, logger=None): | |
if hasattr(dispatchfunc, '__call__') is False: | |
raise AssertionError("EvenHandler constructor requires a callable dispatch function") | |
if logger: | |
self.logger = logger | |
else: | |
self.logger = logging.getLogger('EventHandler') | |
self.dispatchfunc = dispatchfunc | |
def process_IN_CLOSE_WRITE(self, event): | |
self.logger.debug("Saved: %s" % event.pathname) | |
self.dispatchfunc(event) | |
def process_IN_MOVED_TO(self, event): | |
self.logger.debug("Moved to: %s" % event.pathname) | |
self.dispatchfunc(event) | |
def match_filename(filename, patterns, is_casesensitive=False): | |
if not filename or not patterns: | |
return False | |
if hasattr(patterns, '__iter__') is False: | |
patterns = [patterns] | |
if is_casesensitive: | |
fnfunc = fnmatch.fnmatchcase | |
else: | |
fnfunc = fnmatch.fnmatch | |
for pat in patterns: | |
if fnfunc(filename, pat): | |
return True | |
return False | |
class DirmonConfig: | |
def set_dir_actions(self, dir, actions): | |
if type(actions) is not list: | |
actions = [actions] | |
for act in actions: | |
self.set_action(dir, act) | |
self.logger.debug("dir2func=%s" % self.dir2func) | |
def set_action(self, dir, action): | |
"""Determine type of action and make it into a dictionary""" | |
if dir not in self.dir2func: | |
self.dir2func[dir] = {'actiondics': []} | |
actiondic = {} | |
if type(action) is dict: | |
# Given a dictionary as action in the configuration file | |
if 'action' not in action: | |
raise ValueError("Action dictionary for '%s' in configuration %s requires 'action' keyword" % (dir, self.file)) | |
if 'mask' in action: | |
# Optional per watch-directory option | |
try: | |
if 'mask' in self.dir2func[dir]: | |
raise ValueError("Multiple 'mask' defined") | |
self.dir2func[wdir]['mask'] = int(action['mask']) | |
except Exception, err: | |
raise ValueError("Action dictionary for '%s' in configuration %s has invalid mask '%s'" % (dir, self.file, action['mask'])) | |
if 'recursive' in action: | |
# optional per watch-directory options | |
if 'recursive' in self.dir2func[dir]: | |
raise ValueError("Multiple 'recursive' defined") | |
self.dir2func[dir]['recursive'] = action['recursive'] | |
for kw in ('include', 'includere', 'exclude', 'excludere', 'kwargs', 'wait', 'shell'): | |
# optional per action option | |
if kw in action and action[kw]: | |
if kw in ('includere', 'excludere',): | |
actiondic[kw] = re.compile(action[kw], re.I) | |
else: | |
actiondic[kw] = action[kw] | |
action = action['action'] | |
# User specified either archive directory or executable file | |
if os.path.isdir(action) and os.access(action, os.W_OK): | |
actiondic['type'] = 'directory' | |
actiondic['func'] = action | |
self.logger.info("New files in %s will be copied to %s" % (dir, action)) | |
else: | |
arglist = shlex.split(action) | |
if len(arglist) > 1 or os.path.isfile(arglist[0]): | |
if os.path.isabs(arglist[0]) and os.access(arglist[0], os.X_OK) is False: | |
raise ValueError("Action script '%s' for '%s' in %s is not executable" % (action, dir, self.file)) | |
actiondic['type'] = 'script' | |
actiondic['func'] = arglist | |
self.logger.info("New files in %s will run '%s'" % (dir, arglist[0])) | |
else: | |
# Probably a user-defined plugin Python module which must be | |
# found in "dirmon_action" directory (under any PYTHONPATH) | |
try: | |
funcname = action.capitalize() + 'FileProcessor' | |
mod = __import__("dirmon_action."+action, globals(), locals(), [funcname]) | |
mod_func = getattr(mod, funcname) | |
actiondic['type'] = 'module' | |
actiondic['func'] = mod_func | |
except Exception, err: | |
raise ValueError("Unable to import action module '%s.%s()' for '%s': %s"%(action, funcname, dir, err)) | |
self.dir2func[dir]['actiondics'].append(actiondic) | |
def __init__(self, file, exclude_regexps=None, logger=None): | |
"""Requires pathname of JSON configuration file.""" | |
if logger: | |
self.logger = logger | |
else: | |
self.logger = logging.getLogger('DirmonConfig') | |
self.dic = simplejson.load(open(file)) | |
self.logger.debug("Loaded configuration from %s" % file) | |
self.file = file | |
self.dir2func = {} | |
self.exclude_re = [] | |
if exclude_regexps: | |
if hasattr(exclude_regexps, '__iter__') is False: | |
exclude_regexps = [exclude_regexps] | |
for pattern in exclude_regexps: | |
self.exclude_re.append(re.compile(pattern, re.I)) | |
# Populate a dictionary to map every watch-directory to a list of actions | |
# An action may be: | |
# directory | |
# executable | |
# command list | |
# Python module name | |
# dictionary | |
dir2func = self.dir2func | |
for dir, actions in self.dic.items(): | |
if dir.startswith('#'): | |
# Ignore comments | |
continue | |
dir = os.path.abspath(dir) # normalise it | |
if not os.path.isdir(dir): | |
self.logger.error("Invalid watch directory %s in configuration %s." % (dir, self.file)) | |
continue | |
self.set_dir_actions(dir, actions) | |
def add_watches(self, wm, default_mask): | |
"""Add directories to be watched to pyinotify.WatchManager""" | |
for dir, d2fdic in self.dir2func.items(): | |
if 'mask' in d2fdic: | |
mask = d2fdic['mask'] | |
else: | |
mask = default_mask | |
if 'recursive' in d2fdic: | |
rec = d2fdic['recursive'] | |
else: | |
rec = False | |
wdd = wm.add_watch(dir, mask, rec=rec) | |
msg = "Monitor %s [rec=%s mask=%s] with %d actions" % (dir, rec, mask, len(d2fdic['actiondics'])) | |
self.logger.info(msg) | |
def do_actions(self, event): | |
"""Callback for pyinotify EventHandler""" | |
file = event.name | |
fullpath = event.pathname | |
# Match new filename against global exclusion patterns | |
for regexp in self.exclude_re: | |
if regexp.search(file): | |
self.logger.debug("'%s' matches global exclusion pattern - ignored." % fullpath) | |
return | |
if not os.path.exists(fullpath): | |
self.logger.warning("'%s' gone, probably due to race condition - ignored." % fullpath) | |
return | |
dir = os.path.dirname(fullpath) | |
if dir not in self.dir2func: | |
# Maybe it's recursive | |
found = False | |
for d in self.dir2func: | |
if dir.startswith(d): | |
dir = d | |
found = True | |
break | |
if not found: | |
self.logger.error("'%s' has no defined action in %s?? - ignored." % (fullpath, self.file)) | |
return | |
dirdic = self.dir2func[dir] | |
for adic in dirdic['actiondics']: | |
# Perform filename filtering per watch-directory | |
if 'exclude' in adic and adic['exclude']: | |
if match_filename(file, adic['exclude']): | |
self.logger.debug("'%s' matches exclude pattern '%s' - ignored." % (fullpath, adic['exclude'])) | |
continue | |
if 'excludere' in adic and adic['excludere']: | |
if adic['excludere'].search(file): | |
self.logger.debug("'%s' matches exclude regexp - ignored." % (fullpath)) | |
continue | |
if 'include' in adic and adic['include']: | |
if match_filename(file, adic['include']) is False: | |
self.logger.debug("'%s' does not match include pattern '%s' - ignored." % (fullpath, adic['include'])) | |
continue | |
if 'includere' in adic and adic['includere']: | |
if not adic['includere'].search(file): | |
self.logger.debug("'%s' does not match include regexp - ignored." % (fullpath)) | |
continue | |
action_type = adic['type'] | |
func = adic['func'] | |
if 'kwargs' in adic: | |
kwargs = adic['kwargs'] | |
else: | |
kwargs = {} | |
if 'wait' in adic and adic['wait']: | |
wait_flag = True | |
if 'wait' not in kwargs: | |
kwargs['wait'] = True | |
else: | |
wait_flag = False | |
if 'shell' in adic and adic['shell']: | |
shell_flag = True | |
if 'shell' not in kwargs: | |
kwargs['shell'] = True | |
else: | |
shell_flag = False | |
'''New file is dispatched based on the "func" type | |
func is a directory - copy fullpath to func | |
func is a list of command arguments - execute it | |
func is a callable - func(dir, file, fullpath, logger)''' | |
if type(func) is str: | |
try: | |
if wait_flag: | |
shutil.copy(fullpath, func) | |
else: | |
subprocess.Popen(['cp', fullpath, func]) | |
self.logger.info("Copy to %s"%(os.path.join(func,file))) | |
except Exception, err: | |
self.logger.error("Failed to copy '%s' to '%s': %s" % (fullpath, func, err)) | |
elif hasattr(func, '__call__'): | |
try: | |
self.logger.info("Calling %s(%s, %s) ..." %(func.__name__, fullpath, kwargs)) | |
retval = func(dir, file, fullpath, self.logger, **kwargs) | |
self.logger.debug("%s(%s, %s) returned %s" %(func.__name__, fullpath, kwargs, retval)) | |
except Exception, err: | |
self.logger.error("Failed to call %s(%s): %s" % (func.__name__, fullpath, err)) | |
elif hasattr(func, '__iter__'): | |
try: | |
self.logger.info("Executing %s %s [wait=%s, shell=%s]" % (func[0], fullpath, wait_flag, shell_flag)) | |
if wait_flag: | |
if shell_flag: | |
subprocess.call(' '.join(func+[fullpath]), shell=True) | |
else: | |
subprocess.call(func+[fullpath]) | |
self.logger.debug("Executed %s %s successfully" % (func[0], fullpath)) | |
else: | |
if shell_flag: | |
subprocess.Popen(' '.join(func+[fullpath]), shell=True) | |
else: | |
subprocess.Popen(func+[fullpath]) | |
except Exception, err: | |
self.logger.error("Failed to execute %s %s [wait=%s, shell=%s]: %s" % (func[0], fullpath, wait_flag, shell_flag, err)) | |
else: | |
raise AssertionError("Can't deal with this kind of action '%s' for '%s'" % (func,fullpath)) | |
class ConfigFile: | |
scriptdir = os.path.dirname(__file__) | |
scriptname = os.path.basename(__file__) | |
scriptnoext, scriptext = os.path.splitext(scriptname) | |
logger = logging.getLogger('ConfigFile') | |
@staticmethod | |
def get_default_pathname(file=None, ext=None, returnlist=None): | |
'''Search for configuration file and return a valid pathname usable by open() | |
file can be a single string or a list of strings.''' | |
if file: | |
if not hasattr(file, '__iter__'): | |
if os.path.isabs(file) and not os.path.isfile(file): | |
raise ValueError("Configuration file %s is non-existent" % file) | |
tryfiles = [file] | |
else: | |
tryfiles = file | |
else: | |
tryfiles = [ConfigFile.scriptnoext] | |
if ext is None: | |
ext = [''] | |
if not hasattr(ext, '__iter__'): | |
ext = [ext] | |
trydirs = [''] | |
if ConfigFile.scriptdir: | |
trydirs.append(ConfigFile.scriptdir) | |
for d in trydirs: | |
for f in tryfiles: | |
p = os.path.join(d, f) | |
for e in ext: | |
if returnlist is not None: | |
returnlist.append(p+e) | |
elif os.path.isfile(p+e): | |
return p+e | |
if returnlist is None: | |
raise ValueError("Cannot find configuration file %s" % file) | |
def __init__(self, file=None, ext=None): | |
self.file = file | |
self.ext = ext | |
return | |
def get_pathname(self, file=None, ext=None, returnlist=None): | |
if file is None: | |
file = self.file | |
if ext is None: | |
ext = self.ext | |
return ConfigFile.get_default_pathname(file=file, ext=ext, returnlist=returnlist) | |
def main(argv=None): | |
if argv is None: | |
argv = sys.argv | |
debuglevelD = { | |
'debug': logging.DEBUG, | |
'info': logging.INFO, | |
'warning': logging.WARNING, | |
'error': logging.ERROR, | |
'critical': logging.CRITICAL, | |
} | |
defvals = { | |
'loglevel': 'info', | |
'logfile': '/dev/stderr', | |
} | |
trycfgfiles = [] | |
ConfigFile.get_default_pathname(ext=['.ini','.json'], returnlist=trycfgfiles) | |
parser = OptionParser(description=help_text, formatter=IndentedHelpFormatterWithNL()) | |
parser.add_option("-c", dest="cfgfile", type="string", metavar='FILE', \ | |
help="Configuration file in JSON format (%s)" % trycfgfiles) | |
parser.add_option("-l", dest="loglevel", type="string", metavar='LOGLEVEL', \ | |
help="Verbosity (%s): %s"%(defvals['loglevel'], debuglevelD.keys())) | |
parser.add_option("-d", "--daemonize", dest="daemonize", action="store_true", \ | |
help="Run in background") | |
parser.add_option("--logfile", dest="logfile", type="string", metavar='FILE', \ | |
help="Log file") | |
parser.set_defaults(**defvals) | |
(options, args) = parser.parse_args() | |
if options.loglevel not in debuglevelD: raise AssertionError("Verbosity must be one of: %s"%debuglevelD.keys()) | |
dbglvl = debuglevelD[options.loglevel] | |
logger = logging.getLogger() | |
logger.setLevel(dbglvl) | |
ch = logging.StreamHandler() | |
ch.setFormatter( logging.Formatter('%(asctime)s %(lineno)d %(name)s %(funcName)s - %(levelname)s - %(message)s') ) | |
ch.setLevel(dbglvl) | |
logger.addHandler(ch) | |
# Truncate logfile | |
if os.path.isfile(options.logfile): | |
try: | |
fh = open(options.logfile, 'w') | |
fh.close() | |
except Exception, err: | |
parser.error("Failed to truncate existing log file '%s': %s" % (options.logfile, err)) | |
# https://github.com/seb-m/pyinotify/blob/master/python2/examples/daemon.py | |
# http://seb-m.github.com/pyinotify/pyinotify-pysrc.html#Notifier | |
try: | |
# EUMETCAST software, rsync, etc. writes to temporary files and rename when they are complete. | |
# Incomplete temporary filenames match these patterns | |
if not options.cfgfile: | |
options.cfgfile = ConfigFile.get_default_pathname(ext=['.ini','.json']) | |
dmconfig = DirmonConfig(options.cfgfile, exclude_regexps=('^\.', '^temp', '^tmp',)) | |
except Exception, err: | |
parser.error("Failed to load configuration '%s': %s" % (options.cfgfile, err)) | |
logging.info("%s starting using configuration %s" % (os.path.basename(__file__), options.cfgfile)) | |
wm = pyinotify.WatchManager() # Watch Manager | |
notifier = pyinotify.Notifier(wm, EventHandler(dmconfig.do_actions)) | |
default_mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO | |
dmconfig.add_watches(wm, default_mask) | |
notifier.loop(daemonize=options.daemonize, pid_file=False, stdout=options.logfile, stderr=options.logfile) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment