Created
October 7, 2023 14:54
-
-
Save haxwithaxe/cb2844188854b07c17dd35eec58ce922 to your computer and use it in GitHub Desktop.
This is the script I use to forward system mail (mbox) as a digest to a regular email account.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""This forwards mail from the given mbox style mailboxes to a real email. | |
It borrows the ``haxmail.ini`` config file. | |
""" | |
from dataclasses import dataclass, field | |
from typing import Annotated, List | |
import configparser | |
import email | |
import mailbox | |
import os | |
import pathlib | |
import platform | |
from platformdirs import user_cache_path, user_config_path | |
import smtplib | |
import ssl | |
import sys | |
import typer | |
__author__ = 'haxwithaxe <[email protected]>' | |
__copyright__ = 'Copyright 2023, haxwithaxe' | |
__license__ = 'GPLv3' | |
DEFAULT_CONFIG_PATHS = (user_config_path('haxmail.ini'), | |
pathlib.Path('/usr/local/etc/haxmail.ini'), | |
pathlib.Path('/etc/haxmail.ini')) | |
CONFIG_PATH_HELP = ('The haxmail.ini file. If no file is given this searches ' | |
'the following directories in the following order: ' | |
f'{", ".join(DEFAULT_CONFIG_PATHS)}.') | |
app = typer.Typer() | |
class ConfigError(Exception): | |
"""A configuration error. | |
Raised if an error in configuration is found. | |
Arguments: | |
key (str): The config/argument name to be used in the message. | |
value_type (callable, optional): The type constructor of the config | |
value. | |
extend (str, optional): A message to append to the base message. | |
""" | |
def __init__(self, key, value_type=None, extend=None): # noqa: D107 | |
message = (f'Missing value for {key}. Use the `--{key}` argument ' | |
f'or set `{key}` in the config file.') | |
if value_type: | |
message = (f'Argument `--{key}` or config `{key}` must be an ' | |
f'instance of type {value_type}.') | |
if extend: | |
message = f'{message} {extend}' | |
super().__init__(message) | |
class Config: | |
"""The configuration for the report generator. | |
Arguments: | |
subject: The outgoing email subject. | |
to: The outgoing email recipient. | |
smtp_username: The SMTP username. | |
smtp_password: The SMTP password. | |
smtp_server: The SMTP server address/domain. | |
smtp_port: The SMTP server port. | |
""" | |
def __init__(self, subject: str, to: str, smtp_username: str, # noqa: D107 | |
smtp_password: str, smtp_server: str, smtp_port: int, **_): | |
self.subject = subject | |
self.to = to | |
self.smtp_username = smtp_username | |
self.smtp_password = smtp_password | |
self.smtp_server = smtp_server | |
self.smtp_port = smtp_port | |
def fill( | |
self, | |
subject: str = None, | |
to: str = None, | |
smtp_username: str = None, | |
smtp_password: str = None, | |
smtp_server: str = None, | |
smtp_port: int = None, | |
**_ | |
): | |
"""Only fill in values that haven't been set yet.""" | |
self.subject = self.subject or subject | |
self.to = self.to or to | |
self.smtp_username = self.smtp_username or smtp_username | |
self.smtp_password = self.smtp_password or smtp_password | |
self.smtp_server = self.smtp_server or smtp_server | |
self.smtp_port = self.smtp_port or smtp_port | |
def load(self, config_path: pathlib.Path): | |
"""Load config values from `config_path`.""" | |
if not config_path.is_file(): | |
return False | |
try: | |
config_text = config_path.read_text() | |
except PermissionError: | |
print('Permission to read', config_path, 'was denied.', | |
file=sys.stderr) | |
return False | |
conf_parser = configparser.ConfigParser() | |
conf_parser.read_string(config_text) | |
if 'mail' not in conf_parser.sections(): | |
print(config_path, 'is not a valid config file', file=sys.stderr) | |
return False | |
self.fill( | |
**{k.replace('-', '_'): v for k, v in conf_parser['mail'].items()} | |
) | |
return True | |
def __iter__(self) -> iter: # noqa: D105 | |
return iter({ | |
'subject': self.subject, | |
'to': self.to, | |
'smtp_username': self.smtp_username, | |
'smtp_password': self.smtp_password, | |
'smtp_server': self.smtp_server, | |
'smtp_port': self.smtp_port | |
}) | |
def __repr__(self) -> str: # noqa: D105 | |
return ( | |
'<Config ' | |
f'to={self.to}, ' | |
f'smtp_username={self.smtp_username}, ' | |
f'smtp_password={self.smtp_password}, ' | |
f'smtp_server={self.smtp_server}, ' | |
f'smtp_port={self.smtp_port}' | |
'>' | |
) | |
def border(width: int) -> str: | |
"""Return a single height border `width` characters wide.""" | |
return '-' * width | |
def double_border(width: int) -> str: | |
"""Return a double height border `width` characters wide.""" | |
return '\n'.join(['-' * width, f'*{" "*8}' * 9, '-' * width]) | |
@dataclass | |
class Message: | |
"""Message formatting wrapper.""" | |
message: mailbox.mboxMessage | |
"""The message as taken from `mailbox`.""" | |
@property | |
def width(self) -> int: | |
"""The maximum width of the message.""" | |
headers_width = max(len(l) for l in self.headers) # noqa: E741 | |
body_width = max(len(l) for l in self.body.split('\n')) # noqa: E741 | |
return max([headers_width, body_width]) | |
@property | |
def body(self) -> str: | |
"""The message body/payload as a `str`.""" | |
return self.message.get_payload(decode=True).decode() | |
@property | |
def headers(self) -> list[str]: | |
"""The message headers to be displayed in the email.""" | |
return [ | |
f'Subject: {self.message.get("Subject")}', | |
f'From: {self.message.get("From")}', | |
f'To: {self.message.get("To")}', | |
f'Date: {self.message.get("Date")}' | |
] | |
def format(self, width: int) -> str: | |
"""Return a formatted message with headers.""" | |
return '\n'.join([ | |
'\n'.join(self.headers), | |
'-' * width, | |
self.body, | |
double_border(width), | |
'\n' | |
]) | |
def set_mail_header(mail, key, header_key, value): | |
"""Set a header value in the `mail` object. | |
Raises: | |
ConfigError: When the given header is not set and not set in the | |
message. | |
""" | |
if not value and not mail.get(header_key): | |
raise ConfigError(key, extend=f'Nothing is given for {header_key} ' | |
'header and it is empty.') | |
if mail.get(header_key): | |
mail.replace_header(header_key, value) | |
else: | |
mail.add_header(header_key, value) | |
@dataclass | |
class Report: | |
"""Report generator.""" | |
config: dict | |
"""The config as a `dict`.""" | |
user: str = os.getlogin() | |
"""The system user name. Defaults to the user running this script.""" | |
hostname: str = platform.node() | |
"""The system hostname. Defaults to the detected hostname of the system | |
the script is being run on.""" | |
preface: str = ('The following messages for {self.user} were found on ' | |
'{self.hostname}.') | |
"""The text to show at the top of the email before the messages. It will be | |
formatted at the time of sending.""" | |
_messages: list[Message] = field(default_factory=list) | |
"""A `list` of Message objects.""" | |
@property | |
def width(self) -> int: | |
"""The maximum width of all messages.""" | |
return max(m.width for m in self._messages) | |
def append(self, message: mailbox.mboxMessage): | |
"""Append a message to the report.""" | |
self._messages.append(Message(message)) | |
def get_format_vars(self) -> dict: | |
"""Return the `dict` of variables for use in formatting the report.""" | |
return { | |
'user': self.user, | |
'hostname': self.hostname, | |
'self': self | |
} | |
def send(self): | |
"""Send the report as an email. | |
Raises: | |
ConfigError: When a report email header is not available to be set | |
and is not already set. | |
""" | |
if not self._messages: | |
return False | |
subject = f'Local mail for {self.user} from {self.hostname}' | |
msg = email.message.EmailMessage() | |
msg.set_content(str(self)) | |
msg.set_default_type('text/plain') | |
msg.set_type('text/plain') | |
set_mail_header(msg, 'sender-email', 'From', | |
f'mbox <{self.config.smtp_username}>') | |
set_mail_header(msg, 'subject', 'Subject', | |
self.config.subject or subject) | |
set_mail_header(msg, 'to', 'To', self.config.to) | |
msg.set_charset('utf-8') | |
tls_context = ssl.create_default_context() | |
with smtplib.SMTP(self.config.smtp_server, self.config.smtp_port) as \ | |
server: | |
server.ehlo() | |
server.starttls(context=tls_context) | |
server.ehlo() | |
server.login(self.config.smtp_username, self.config.smtp_password) | |
server.sendmail(self.config.smtp_username, self.config.to, | |
msg.as_bytes()) | |
return True | |
def __str__(self): # noqa: D105 | |
width = self.width | |
report = [self.preface, double_border(len(self.preface))] | |
for message in self._messages: | |
report.append(message.format(width)) | |
return '\n'.join(report).format(**self.get_format_vars()) | |
@app.command() | |
def main( # noqa: D103 | |
mbox: Annotated[List[pathlib.Path], typer.Option(...)] = None, | |
subject: str = typer.Option(None, # noqa: B008 | |
help='The subject of the outgoing email.'), | |
to: str = typer.Option(None, # noqa: B008 | |
help='The recipient of the outgoing email.'), | |
smtp_username: str = typer.Option(None, # noqa: B008 | |
help='The SMTP username. Defaults ' | |
'to the value in the config.'), | |
smtp_password: str = typer.Option(None, # noqa: B008 | |
help='The SMTP password. Defaults ' | |
'to the value in the config.'), | |
smtp_server: str = typer.Option(None, help='The SMTP server ' # noqa: B008 | |
'address/hostname. Defaults to the value ' | |
'in the config.'), | |
smtp_port: int = typer.Option(None, # noqa: B008 | |
help='The SMTP server port number. ' | |
'Defaults to the value in the config.'), | |
config: pathlib.Path = typer.Option(None, # noqa: B008 | |
help=CONFIG_PATH_HELP) | |
): | |
conf = Config(subject, to, smtp_username, smtp_password, smtp_server, | |
smtp_port) | |
if config: | |
conf.load(config) | |
else: | |
for conf_path in DEFAULT_CONFIG_PATHS: | |
if conf.load(conf_path): | |
break | |
report = Report(conf) | |
if not mbox: | |
print('At least one mbox is required.') | |
raise typer.Exit(1) | |
for box in mbox: | |
for message in mailbox.mbox(box): | |
report.append(message) | |
if report.send(): | |
cache_path = user_cache_path('mbox-forward.cache') | |
cache_path.write_bytes(b'') | |
for box in mbox: | |
with cache_path.open('ab') as cache: | |
cache.write(box.read_bytes()) | |
os.truncate(box, 0) | |
if __name__ == '__main__': | |
app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment