Skip to content

Instantly share code, notes, and snippets.

@cynici
Last active October 12, 2015 19:38
Show Gist options
  • Save cynici/4076879 to your computer and use it in GitHub Desktop.
Save cynici/4076879 to your computer and use it in GitHub Desktop.
Monitor directories and take action on new files, for Python 2.7+
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Author: [email protected]
# Version: 20130307c
# Source: https://gist.github.com/cynici/4076879
# Requires: Python 2.6+, pyinotify
#
import os, sys, datetime, re
from optparse import OptionParser, IndentedHelpFormatter, textwrap
import simplejson
import pyinotify
import logging
import shutil, subprocess, shlex, fnmatch
help_text = """Monitor directories in *local* filesystem for new files and do something.
Example configuration file in JSON format:
{
"/data/EUMETSAT_Data_Channel_2": ["dir1", "script2 option"],
"/data/EUMETSAT_Data_Channel_10": "mod3",
"#/example/commented_entry": "this entry will be ignored.",
"/raid/pub/gsfcdata/npp/viirs/level2": {
"include": "FireLoc*.txt",
"action": "/usr/bin/ncftpput -u ftpuser -p password ftpserver ."
}
}
Eg1: "dir1" is a directory to which new files will be copied.
Eg2: "script2" is *absolute* pathname of an executable to run with new file as argument.
Eg3: "mod3" is user-defined Python module in dirmon_action/mod3.py in
which Mod3FileProcessor(dir,file,fullpath,logger,**kwargs) is defined.
"dirmon_action" directory must be located in same directory %prog
or one of PYTHONPATH and contains __init__.py
Eg4: Commented entry
Eg5: Filter new files and execute ncftpput. Other keywords are:
"exclude": opposite of "include"
"excludere": matching with regular expression, as is "includere"
"mask": sum of pyinotify.IN_* integer value
"recursive": directory to be watched recursively
"shell": run command in a shell
"wait": wait for command to complete execution"""
class IndentedHelpFormatterWithNL(IndentedHelpFormatter):
def format_description(self, description):
if not description: return ""
desc_width = self.width - self.current_indent
indent = " "*self.current_indent
# the above is still the same
bits = description.split('\n')
formatted_bits = [
textwrap.fill(bit,
desc_width,
initial_indent=indent,
subsequent_indent=indent)
for bit in bits]
result = "\n".join(formatted_bits) + "\n"
return result
def format_option(self, option):
# The help for each option consists of two parts:
# * the opt strings and metavars
# eg. ("-x", or "-fFILENAME, --file=FILENAME")
# * the user-supplied help string
# eg. ("turn on expert mode", "read data from FILENAME")
#
# If possible, we write both of these on the same line:
# -x turn on expert mode
#
# But if the opt string list is too long, we put the help
# string on a second line, indented to the same column it would
# start in if it fit on the first line.
# -fFILENAME, --file=FILENAME
# read data from FILENAME
result = []
opts = self.option_strings[option]
opt_width = self.help_position - self.current_indent - 2
if len(opts) > opt_width:
opts = "%*s%s\n" % (self.current_indent, "", opts)
indent_first = self.help_position
else: # start help on same line as opts
opts = "%*s%-*s " % (self.current_indent, "", opt_width, opts)
indent_first = 0
result.append(opts)
if option.help:
help_text = self.expand_default(option)
# Everything is the same up through here
help_lines = []
for para in help_text.split("\n"):
help_lines.extend(textwrap.wrap(para, self.help_width))
# Everything is the same after here
result.append("%*s%s\n" % (
indent_first, "", help_lines[0]))
result.extend(["%*s%s\n" % (self.help_position, "", line)
for line in help_lines[1:]])
elif opts[-1] != "\n":
result.append("\n")
return "".join(result)
class EventHandler(pyinotify.ProcessEvent):
def __init__(self, dispatchfunc, logger=None):
if hasattr(dispatchfunc, '__call__') is False:
raise AssertionError("EvenHandler constructor requires a callable dispatch function")
if logger:
self.logger = logger
else:
self.logger = logging.getLogger('EventHandler')
self.dispatchfunc = dispatchfunc
def process_IN_CLOSE_WRITE(self, event):
self.logger.debug("Saved: %s" % event.pathname)
self.dispatchfunc(event)
def process_IN_MOVED_TO(self, event):
self.logger.debug("Moved to: %s" % event.pathname)
self.dispatchfunc(event)
def match_filename(filename, patterns, is_casesensitive=False):
if not filename or not patterns:
return False
if hasattr(patterns, '__iter__') is False:
patterns = [patterns]
if is_casesensitive:
fnfunc = fnmatch.fnmatchcase
else:
fnfunc = fnmatch.fnmatch
for pat in patterns:
if fnfunc(filename, pat):
return True
return False
class DirmonConfig:
def set_dir_actions(self, dir, actions):
if type(actions) is not list:
actions = [actions]
for act in actions:
self.set_action(dir, act)
self.logger.debug("dir2func=%s" % self.dir2func)
def set_action(self, dir, action):
"""Determine type of action and make it into a dictionary"""
if dir not in self.dir2func:
self.dir2func[dir] = {'actiondics': []}
actiondic = {}
if type(action) is dict:
# Given a dictionary as action in the configuration file
if 'action' not in action:
raise ValueError("Action dictionary for '%s' in configuration %s requires 'action' keyword" % (dir, self.file))
if 'mask' in action:
# Optional per watch-directory option
try:
if 'mask' in self.dir2func[dir]:
raise ValueError("Multiple 'mask' defined")
self.dir2func[wdir]['mask'] = int(action['mask'])
except Exception, err:
raise ValueError("Action dictionary for '%s' in configuration %s has invalid mask '%s'" % (dir, self.file, action['mask']))
if 'recursive' in action:
# optional per watch-directory options
if 'recursive' in self.dir2func[dir]:
raise ValueError("Multiple 'recursive' defined")
self.dir2func[dir]['recursive'] = action['recursive']
for kw in ('include', 'includere', 'exclude', 'excludere', 'kwargs', 'wait', 'shell'):
# optional per action option
if kw in action and action[kw]:
if kw in ('includere', 'excludere',):
actiondic[kw] = re.compile(action[kw], re.I)
else:
actiondic[kw] = action[kw]
action = action['action']
# User specified either archive directory or executable file
if os.path.isdir(action) and os.access(action, os.W_OK):
actiondic['type'] = 'directory'
actiondic['func'] = action
self.logger.info("New files in %s will be copied to %s" % (dir, action))
else:
arglist = shlex.split(action)
if len(arglist) > 1 or os.path.isfile(arglist[0]):
if os.path.isabs(arglist[0]) and os.access(arglist[0], os.X_OK) is False:
raise ValueError("Action script '%s' for '%s' in %s is not executable" % (action, dir, self.file))
actiondic['type'] = 'script'
actiondic['func'] = arglist
self.logger.info("New files in %s will run '%s'" % (dir, arglist[0]))
else:
# Probably a user-defined plugin Python module which must be
# found in "dirmon_action" directory (under any PYTHONPATH)
try:
funcname = action.capitalize() + 'FileProcessor'
mod = __import__("dirmon_action."+action, globals(), locals(), [funcname])
mod_func = getattr(mod, funcname)
actiondic['type'] = 'module'
actiondic['func'] = mod_func
except Exception, err:
raise ValueError("Unable to import action module '%s.%s()' for '%s': %s"%(action, funcname, dir, err))
self.dir2func[dir]['actiondics'].append(actiondic)
def __init__(self, file, exclude_regexps=None, logger=None):
"""Requires pathname of JSON configuration file."""
if logger:
self.logger = logger
else:
self.logger = logging.getLogger('DirmonConfig')
self.dic = simplejson.load(open(file))
self.logger.debug("Loaded configuration from %s" % file)
self.file = file
self.dir2func = {}
self.exclude_re = []
if exclude_regexps:
if hasattr(exclude_regexps, '__iter__') is False:
exclude_regexps = [exclude_regexps]
for pattern in exclude_regexps:
self.exclude_re.append(re.compile(pattern, re.I))
# Populate a dictionary to map every watch-directory to a list of actions
# An action may be:
# directory
# executable
# command list
# Python module name
# dictionary
dir2func = self.dir2func
for dir, actions in self.dic.items():
if dir.startswith('#'):
# Ignore comments
continue
dir = os.path.abspath(dir) # normalise it
if not os.path.isdir(dir):
self.logger.error("Invalid watch directory %s in configuration %s." % (dir, self.file))
continue
self.set_dir_actions(dir, actions)
def add_watches(self, wm, default_mask):
"""Add directories to be watched to pyinotify.WatchManager"""
for dir, d2fdic in self.dir2func.items():
if 'mask' in d2fdic:
mask = d2fdic['mask']
else:
mask = default_mask
if 'recursive' in d2fdic:
rec = d2fdic['recursive']
else:
rec = False
wdd = wm.add_watch(dir, mask, rec=rec)
msg = "Monitor %s [rec=%s mask=%s] with %d actions" % (dir, rec, mask, len(d2fdic['actiondics']))
self.logger.info(msg)
def do_actions(self, event):
"""Callback for pyinotify EventHandler"""
file = event.name
fullpath = event.pathname
# Match new filename against global exclusion patterns
for regexp in self.exclude_re:
if regexp.search(file):
self.logger.debug("'%s' matches global exclusion pattern - ignored." % fullpath)
return
if not os.path.exists(fullpath):
self.logger.warning("'%s' gone, probably due to race condition - ignored." % fullpath)
return
dir = os.path.dirname(fullpath)
if dir not in self.dir2func:
# Maybe it's recursive
found = False
for d in self.dir2func:
if dir.startswith(d):
dir = d
found = True
break
if not found:
self.logger.error("'%s' has no defined action in %s?? - ignored." % (fullpath, self.file))
return
dirdic = self.dir2func[dir]
for adic in dirdic['actiondics']:
# Perform filename filtering per watch-directory
if 'exclude' in adic and adic['exclude']:
if match_filename(file, adic['exclude']):
self.logger.debug("'%s' matches exclude pattern '%s' - ignored." % (fullpath, adic['exclude']))
continue
if 'excludere' in adic and adic['excludere']:
if adic['excludere'].search(file):
self.logger.debug("'%s' matches exclude regexp - ignored." % (fullpath))
continue
if 'include' in adic and adic['include']:
if match_filename(file, adic['include']) is False:
self.logger.debug("'%s' does not match include pattern '%s' - ignored." % (fullpath, adic['include']))
continue
if 'includere' in adic and adic['includere']:
if not adic['includere'].search(file):
self.logger.debug("'%s' does not match include regexp - ignored." % (fullpath))
continue
action_type = adic['type']
func = adic['func']
if 'kwargs' in adic:
kwargs = adic['kwargs']
else:
kwargs = {}
if 'wait' in adic and adic['wait']:
wait_flag = True
if 'wait' not in kwargs:
kwargs['wait'] = True
else:
wait_flag = False
if 'shell' in adic and adic['shell']:
shell_flag = True
if 'shell' not in kwargs:
kwargs['shell'] = True
else:
shell_flag = False
'''New file is dispatched based on the "func" type
func is a directory - copy fullpath to func
func is a list of command arguments - execute it
func is a callable - func(dir, file, fullpath, logger)'''
if type(func) is str:
try:
if wait_flag:
shutil.copy(fullpath, func)
else:
subprocess.Popen(['cp', fullpath, func])
self.logger.info("Copy to %s"%(os.path.join(func,file)))
except Exception, err:
self.logger.error("Failed to copy '%s' to '%s': %s" % (fullpath, func, err))
elif hasattr(func, '__call__'):
try:
self.logger.info("Calling %s(%s, %s) ..." %(func.__name__, fullpath, kwargs))
retval = func(dir, file, fullpath, self.logger, **kwargs)
self.logger.debug("%s(%s, %s) returned %s" %(func.__name__, fullpath, kwargs, retval))
except Exception, err:
self.logger.error("Failed to call %s(%s): %s" % (func.__name__, fullpath, err))
elif hasattr(func, '__iter__'):
try:
self.logger.info("Executing %s %s [wait=%s, shell=%s]" % (func[0], fullpath, wait_flag, shell_flag))
if wait_flag:
if shell_flag:
subprocess.call(' '.join(func+[fullpath]), shell=True)
else:
subprocess.call(func+[fullpath])
self.logger.debug("Executed %s %s successfully" % (func[0], fullpath))
else:
if shell_flag:
subprocess.Popen(' '.join(func+[fullpath]), shell=True)
else:
subprocess.Popen(func+[fullpath])
except Exception, err:
self.logger.error("Failed to execute %s %s [wait=%s, shell=%s]: %s" % (func[0], fullpath, wait_flag, shell_flag, err))
else:
raise AssertionError("Can't deal with this kind of action '%s' for '%s'" % (func,fullpath))
class ConfigFile:
scriptdir = os.path.dirname(__file__)
scriptname = os.path.basename(__file__)
scriptnoext, scriptext = os.path.splitext(scriptname)
logger = logging.getLogger('ConfigFile')
@staticmethod
def get_default_pathname(file=None, ext=None, returnlist=None):
'''Search for configuration file and return a valid pathname usable by open()
file can be a single string or a list of strings.'''
if file:
if not hasattr(file, '__iter__'):
if os.path.isabs(file) and not os.path.isfile(file):
raise ValueError("Configuration file %s is non-existent" % file)
tryfiles = [file]
else:
tryfiles = file
else:
tryfiles = [ConfigFile.scriptnoext]
if ext is None:
ext = ['']
if not hasattr(ext, '__iter__'):
ext = [ext]
trydirs = ['']
if ConfigFile.scriptdir:
trydirs.append(ConfigFile.scriptdir)
for d in trydirs:
for f in tryfiles:
p = os.path.join(d, f)
for e in ext:
if returnlist is not None:
returnlist.append(p+e)
elif os.path.isfile(p+e):
return p+e
if returnlist is None:
raise ValueError("Cannot find configuration file %s" % file)
def __init__(self, file=None, ext=None):
self.file = file
self.ext = ext
return
def get_pathname(self, file=None, ext=None, returnlist=None):
if file is None:
file = self.file
if ext is None:
ext = self.ext
return ConfigFile.get_default_pathname(file=file, ext=ext, returnlist=returnlist)
def main(argv=None):
if argv is None:
argv = sys.argv
debuglevelD = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL,
}
defvals = {
'loglevel': 'info',
'logfile': '/dev/stderr',
}
trycfgfiles = []
ConfigFile.get_default_pathname(ext=['.ini','.json'], returnlist=trycfgfiles)
parser = OptionParser(description=help_text, formatter=IndentedHelpFormatterWithNL())
parser.add_option("-c", dest="cfgfile", type="string", metavar='FILE', \
help="Configuration file in JSON format (%s)" % trycfgfiles)
parser.add_option("-l", dest="loglevel", type="string", metavar='LOGLEVEL', \
help="Verbosity (%s): %s"%(defvals['loglevel'], debuglevelD.keys()))
parser.add_option("-d", "--daemonize", dest="daemonize", action="store_true", \
help="Run in background")
parser.add_option("--logfile", dest="logfile", type="string", metavar='FILE', \
help="Log file")
parser.set_defaults(**defvals)
(options, args) = parser.parse_args()
if options.loglevel not in debuglevelD: raise AssertionError("Verbosity must be one of: %s"%debuglevelD.keys())
dbglvl = debuglevelD[options.loglevel]
logger = logging.getLogger()
logger.setLevel(dbglvl)
ch = logging.StreamHandler()
ch.setFormatter( logging.Formatter('%(asctime)s %(lineno)d %(name)s %(funcName)s - %(levelname)s - %(message)s') )
ch.setLevel(dbglvl)
logger.addHandler(ch)
# Truncate logfile
if os.path.isfile(options.logfile):
try:
fh = open(options.logfile, 'w')
fh.close()
except Exception, err:
parser.error("Failed to truncate existing log file '%s': %s" % (options.logfile, err))
# https://github.com/seb-m/pyinotify/blob/master/python2/examples/daemon.py
# http://seb-m.github.com/pyinotify/pyinotify-pysrc.html#Notifier
try:
# EUMETCAST software, rsync, etc. writes to temporary files and rename when they are complete.
# Incomplete temporary filenames match these patterns
if not options.cfgfile:
options.cfgfile = ConfigFile.get_default_pathname(ext=['.ini','.json'])
dmconfig = DirmonConfig(options.cfgfile, exclude_regexps=('^\.', '^temp', '^tmp',))
except Exception, err:
parser.error("Failed to load configuration '%s': %s" % (options.cfgfile, err))
logging.info("%s starting using configuration %s" % (os.path.basename(__file__), options.cfgfile))
wm = pyinotify.WatchManager() # Watch Manager
notifier = pyinotify.Notifier(wm, EventHandler(dmconfig.do_actions))
default_mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO
dmconfig.add_watches(wm, default_mask)
notifier.loop(daemonize=options.daemonize, pid_file=False, stdout=options.logfile, stderr=options.logfile)
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment