Last active
December 24, 2015 16:38
-
-
Save wrouesnel/6829044 to your computer and use it in GitHub Desktop.
A daemon script for watching watching IMAP accounts with IDLE and invoking getmail. Works with GetMail config files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import imaplib2 | |
import time | |
import threading | |
import subprocess | |
import argparse | |
import signal | |
import psutil | |
import sys | |
import os | |
import ConfigParser | |
import pwd | |
import traceback | |
import Queue | |
import logging | |
import logging.handlers | |
import socket | |
# use this to flash our emergency mail on a hard shutdown | |
from email.mime.text import MIMEText | |
# Logging | |
verbosity = 0 | |
pidfile = None | |
daemonize = False | |
start_shutdown = threading.Event() | |
CONST_shutdown = "dienow" | |
# Send notification mail to clients about server status | |
def notify_mail(subject, message, idler=None): | |
if idler is not None: | |
message = message + '\n' + "%s %s" % (idler.username, idler.server) | |
msg = MIMEText(message) | |
msg["From"] = pwd.getpwuid(os.getuid())[0] | |
msg["To"] = pwd.getpwuid(os.getuid())[0] | |
msg["Subject"] = "Getmail Idler: %s" % subject | |
try: | |
p = subprocess.Popen(['/usr/sbin/sendmail','-t'],stdin=subprocess.PIPE) | |
p.communicate(msg.as_string()) | |
except: | |
logger.error("Got exception trying to send notification email! %s" | |
% traceback.format_exc()) | |
# This is the threading object that does all the waiting on | |
# the event | |
class Idler(object): | |
def __init__(self, getmailConfig, reidle_queue): | |
self.getmailConfig = getmailConfig | |
self.reidle_queue = reidle_queue | |
self.M = None | |
self.onlineEvent = threading.Event() | |
logger.info("Finding config: %s" % os.path.basename(self.getmailConfig)) | |
if not os.path.exists(self.getmailConfig): | |
# try the .getmail directory | |
altconfigfile = os.path.join(os.path.expanduser("~"), ".getmail", | |
os.path.basename(self.getmailConfig)) | |
if not os.path.exists(altconfigfile): | |
raise Exception("%s does not exist!" % altconfigfile) | |
else: | |
self.getmailConfig = altconfigfile | |
logger.info("Parsing: %s" % self.getmailConfig) | |
config = ConfigParser.SafeConfigParser() | |
config.read(self.getmailConfig) | |
servertype = config.get('retriever', 'type') | |
if servertype.find('IMAP') == -1: | |
raise Exception("Non-IMAP configuration file won't be polled. " + | |
"You should not pass it in to this command.") | |
# Store these so we can reconnect on disconnect | |
self.username = config.get('retriever', 'username') | |
self.password = config.get('retriever', 'password') | |
self.server = config.get('retriever', 'server') | |
# Do the login | |
self.onlineEvent.set() # initially set this, so we get notified | |
self._imaplogin() | |
# IMAP login | |
def _imaplogin(self): | |
# FIXME: handle non-SSL servers!! | |
try: | |
if self.M is None: | |
self.M = imaplib2.IMAP4_SSL(self.server) | |
self.M.login(self.username, self.password) | |
# We need to get out of the AUTH state, so we just select the INBOX. | |
self.M.select("INBOX") | |
logger.info("%s : %s CONNECTED. Fetching new mail..." | |
% (self.username, self.server) ) | |
notify_mail("%s %s AVAILABLE!" % (self.username, self.server), | |
"IMAP server login SUCCEEDED.", | |
idler=self) | |
self.onlineEvent.set() | |
reidle_queue.put(self.idle) | |
except (imaplib2.IMAP4.abort, imaplib2.IMAP4.error, socket.error): | |
# Is this a new state change? | |
if self.onlineEvent.isSet(): | |
notify_mail("%s %s unavailable!" % (self.username, self.server), | |
"IMAP server login failed. System will retry.", | |
idler=self) | |
logger.warning("%s : %s DISCONNECTED. Attempting reconnect." | |
% (self.username, self.server)) | |
self.onlineEvent.clear() | |
def waiter(): | |
logger.debug("Waiter spawned and sleeping %s %s" | |
% (self.username, self.server)) | |
time.sleep(5) | |
logger.debug("Waiter requesting relogin attempt %s %s" | |
% (self.username, self.server)) | |
reidle_queue.put(self._imaplogin) | |
return | |
waitThread = threading.Thread(target=waiter) | |
waitThread.start() | |
def __del__(self): | |
# Close mailbox and shutdown | |
if self.M is not None: | |
logger.debug("About to close %s %s" % (self.username, self.server)) | |
self.M.close() | |
logger.debug("Closed %s %s" % (self.username, self.server)) | |
self.M.logout() | |
logger.info("Idler logout for %s : %s" % (self.username, self.server)) | |
# Start an idle thread | |
def idle(self): | |
try: | |
self.M.idle(callback=self.dosync) | |
logger.info("Started IDLE for %s on %s" % (self.username, self.server)) | |
except (imaplib2.IMAP4.abort, imaplib2.IMAP4.error, socket.error): | |
# If for some reason we're not logged on, then logon. Otherwise | |
# let the exception percolate up and kill the process. | |
self._imaplogin() | |
# The method that gets called when a new email arrives. | |
def dosync(self, args): | |
# Handle possible errors | |
result, arg, exc = args | |
logger.info("%s : %s" % (result[0], result[1])) | |
if result is None: | |
logger.info("Error during IDLE: %s" % str(exc)) | |
logger.info("Attempting reconnect...") | |
self._imaplogin() # Try and relogin | |
return | |
self.do_mailfetch() # Just fetch | |
reidle_queue.put(self.idle) # Requeue restart of IDLE command | |
# Call getmail or the nominated subprocess | |
def do_mailfetch(self): | |
logger.info("Getmail sync: %s" % self.getmailConfig) | |
output = "" | |
try: | |
logger.debug("Running getmail with config: %s" % self.getmailConfig) | |
output = subprocess.check_output(["getmail","-r",self.getmailConfig]) | |
logger.debug("%s" % output) | |
except subprocess.CalledProcessError, e: | |
output = e.output | |
logger.warning("Non-zero return for getmail (%s) %s" % | |
(self.getmailConfig, e.returncode)) | |
logger.debug("%s" % output) | |
logger.debug("Getmail sync complete for %s" % self.getmailConfig) | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-r", | |
help="getmail configuration file to use" + | |
" (can specify more then once)", action="append", | |
dest="getmailconfigs",metavar='GETMAILRC') | |
parser.add_argument("--pid-file", "-p", nargs=1, | |
help="pidfile to use for process limiting", action="store", | |
dest="pidfile") | |
parser.add_argument("--verbose", "-v", | |
help="set output verbosity", action="count", | |
dest="verbosity") | |
#parser.add_argument("--override-deliver", | |
# help="pidfile to use for process limiting", action="store", | |
# dest="delivercmd") | |
parser.add_argument("--daemonize", | |
help="should process daemonize?", action="store_true", | |
dest="daemonize") | |
parser.add_argument("--logfile", nargs=1, | |
help="file to redirect log output too (useful for daemon mode)", | |
action="store", dest="logfile") | |
args = parser.parse_args() | |
# Configure logging | |
logger = logging.getLogger() | |
log_formatter = logging.Formatter('%(levelname)s: %(message)s') | |
file_formatter = logging.Formatter('%(asctime)-15s: %(levelname)s: %(funcName)s : %(message)s') | |
verbosity = (40 - (args.verbosity*10)) if (40 - (args.verbosity*10)) > 0 else 0 | |
logger.setLevel(verbosity) | |
# Console logging | |
ch = logging.StreamHandler() | |
ch.setLevel(verbosity) | |
ch.setFormatter(log_formatter) | |
logger.addHandler(ch) | |
# Descriptors to preserve on daemonize | |
fd_keep = [] | |
# Maybe do file logging | |
logfile = os.path.realpath(args.logfile[0]) if (args.logfile is not None) else None | |
if logfile: | |
fh = logging.handlers.RotatingFileHandler(logfile, maxBytes=1048576, | |
backupCount=3) | |
fh.setLevel(verbosity) | |
fh.setFormatter(file_formatter) | |
logger.addHandler(fh) | |
fd_keep.append(fh.stream.fileno()) | |
pidfile = os.path.realpath(args.pidfile[0]) if len(args.pidfile) > 0 else None | |
daemonize = args.daemonize | |
getmailconfigs = [os.path.realpath(path) for path in args.getmailconfigs] | |
logger.debug("Pid File %s" % pidfile) | |
logger.debug("Daemonize: %s" % daemonize) | |
[logger.debug("Getmail Config File: %s" % getmailconfig) for getmailconfig in getmailconfigs] | |
logger.debug("Log File: %s" % logfile) | |
# Exit in error early if we somehow have no config data. | |
if len(args.getmailconfigs) == 0: | |
logger.error("No Getmail configurations specified - exiting now.") | |
sys.exit(1) | |
# Is process already running? | |
logger.debug("Pid file value is %s" % pidfile) | |
if pidfile: | |
# Was something running? | |
if os.path.isfile(pidfile): | |
# Is it still running? | |
pid = long(file(pidfile, 'r').readline()) | |
if psutil.pid_exists(pid): | |
logger.debug("Daemon already running. Silent termination.") | |
sys.exit(0) | |
else: | |
logger.debug("Found a stale pid file: %s" % pidfile) | |
# NOTE: Be quiet with logging at INFO level up to here, so we don't get cron | |
# spam. There are more sensible ways to do this of course. | |
# Check for GetMail executable usability | |
logger.info("Check for usable getmail...") | |
FNULL = open(os.devnull, 'w') | |
if subprocess.call("getmail --version", shell=True,stdout=FNULL, | |
stderr=subprocess.STDOUT) != 0: | |
logger.error("Getmail executable not available. It might not be installed.\n") | |
sys.exit(2) | |
logger.info("Getmail is usable.") | |
# Check if a pidfile was specified and warn | |
if not args.pidfile: | |
logger.warning("Running without a pidfile. Multiple executions may occur.") | |
# This is the time to daemonize if we're going | |
if daemonize: | |
try: | |
pid = os.fork() | |
if pid > 0: | |
# exit first parent | |
os._exit(0) | |
except OSError, e: | |
logger.error("fork #1 failed: %d (%s)" % (e.errno, e.strerror)) | |
sys.exit(1) | |
# decouple from parent environment | |
os.chdir("/") | |
os.setsid() | |
os.umask(0) | |
# do second fork | |
try: | |
pid = os.fork() | |
if pid > 0: | |
# exit from second parent, print write new pidfile | |
logger.info("Daemonized with PID %s" % pid) | |
logger.removeHandler(ch) # remove console handler after here | |
ch.close() | |
os._exit(0) | |
except OSError, e: | |
logger.error("fork #2 failed: %d (%s)" % (e.errno, e.strerror)) | |
sys.exit(1) | |
# At this point we are daemonized - close old file descriptors except | |
# logging | |
import resource # Resource usage logger.information. | |
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] | |
if (maxfd == resource.RLIM_INFINITY): | |
maxfd = MAXFD | |
#Iterate through and close all file descriptors. | |
for fd in range(0, maxfd): | |
try: | |
if not any(keep_fd == fd for keep_fd in fd_keep): | |
os.close(fd) | |
except OSError: # ERROR, fd wasn't open to begin with (ignored) | |
pass | |
# Redirect std streams | |
sys.stdout.flush() | |
sys.stderr.flush() | |
si = file(os.devnull, 'r') | |
so = file(os.devnull, 'a+') | |
se = file(os.devnull, 'a+', 0) | |
os.dup2(si.fileno(), sys.stdin.fileno()) | |
os.dup2(so.fileno(), sys.stdout.fileno()) | |
os.dup2(se.fileno(), sys.stderr.fileno()) | |
# Write new pid file | |
file(pidfile, 'w').write(str(os.getpid())) | |
# signal handler | |
def shutdown(sig, frame): | |
logger.info("Caught signal %s" % sig) | |
start_shutdown.set() | |
# Handle signals | |
for sig in (signal.SIGINT, signal.SIGHUP, signal.SIGQUIT, signal.SIGABRT, | |
signal.SIGTERM): | |
signal.signal(sig, shutdown) | |
reidle_queue = Queue.Queue() # Queue for idlers requesting reset of IDLE | |
# Sets up, IDLEs and tears down workers | |
def workerhandler(): | |
idlers = [] # Hold reference to idlers in main thread to avoid GC | |
# Otherwise queue get/put can destroy them. | |
# Setup idlers | |
try: | |
for configfile in getmailconfigs: | |
idler = Idler(configfile, reidle_queue) | |
idlers.append(idler) | |
# Process queue | |
while True: | |
obj = reidle_queue.get() | |
if type(obj) == str: | |
logger.debug("Received termination signal.") | |
break | |
obj() # should be a function | |
reidle_queue.task_done() | |
reidle_queue.task_done() | |
except: | |
tb = traceback.format_exc() | |
logger.error("%s" % tb) | |
start_shutdown.set() # Main thread should die. | |
# We're about to end this thread, but the main thread is sleeping so | |
# send a signal to ourselves to wake it up die. | |
os.kill(os.getpid(), signal.SIGINT) | |
return | |
workerhandler_thread = threading.Thread(target=workerhandler) | |
# Create an idler for each getmail config file we received that uses IMAP | |
# We will die if any of these don't work, since getting our mail is kind of | |
# important. | |
try: | |
workerhandler_thread.start() | |
# Handle signals | |
while True: | |
signal.pause() | |
if start_shutdown.is_set(): | |
break | |
except Exception, e: | |
tb = traceback.format_exc() | |
logger.error("%s" % tb) | |
finally: | |
# final dispensation | |
logger.info("Queuing IDLE worker shutdown...") | |
reidle_queue.put(CONST_shutdown) | |
workerhandler_thread.join() | |
logger.info("IDLE worker joined.") | |
# Dispatch a shutdown notification | |
notify_mail("Daemon shutting down", traceback.format_exc()) | |
if pidfile: | |
try: | |
os.unlink(pidfile) | |
logger.debug("pidfile deleted.") | |
except OSError: | |
logger.warning("pidfile was already deleted?") | |
logger.info("Process exiting.") | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment