Skip to content

Instantly share code, notes, and snippets.

@haxwithaxe
Created October 7, 2023 14:54
Show Gist options
  • Save haxwithaxe/cb2844188854b07c17dd35eec58ce922 to your computer and use it in GitHub Desktop.
Save haxwithaxe/cb2844188854b07c17dd35eec58ce922 to your computer and use it in GitHub Desktop.
This is the script I use to forward system mail (mbox) as a digest to a regular email account.
#!/usr/bin/env python3
"""This forwards mail from the given mbox style mailboxes to a real email.
It borrows the ``haxmail.ini`` config file.
"""
from dataclasses import dataclass, field
from typing import Annotated, List
import configparser
import email
import mailbox
import os
import pathlib
import platform
from platformdirs import user_cache_path, user_config_path
import smtplib
import ssl
import sys
import typer
__author__ = 'haxwithaxe <[email protected]>'
__copyright__ = 'Copyright 2023, haxwithaxe'
__license__ = 'GPLv3'
DEFAULT_CONFIG_PATHS = (user_config_path('haxmail.ini'),
pathlib.Path('/usr/local/etc/haxmail.ini'),
pathlib.Path('/etc/haxmail.ini'))
CONFIG_PATH_HELP = ('The haxmail.ini file. If no file is given this searches '
'the following directories in the following order: '
f'{", ".join(DEFAULT_CONFIG_PATHS)}.')
app = typer.Typer()
class ConfigError(Exception):
"""A configuration error.
Raised if an error in configuration is found.
Arguments:
key (str): The config/argument name to be used in the message.
value_type (callable, optional): The type constructor of the config
value.
extend (str, optional): A message to append to the base message.
"""
def __init__(self, key, value_type=None, extend=None): # noqa: D107
message = (f'Missing value for {key}. Use the `--{key}` argument '
f'or set `{key}` in the config file.')
if value_type:
message = (f'Argument `--{key}` or config `{key}` must be an '
f'instance of type {value_type}.')
if extend:
message = f'{message} {extend}'
super().__init__(message)
class Config:
"""The configuration for the report generator.
Arguments:
subject: The outgoing email subject.
to: The outgoing email recipient.
smtp_username: The SMTP username.
smtp_password: The SMTP password.
smtp_server: The SMTP server address/domain.
smtp_port: The SMTP server port.
"""
def __init__(self, subject: str, to: str, smtp_username: str, # noqa: D107
smtp_password: str, smtp_server: str, smtp_port: int, **_):
self.subject = subject
self.to = to
self.smtp_username = smtp_username
self.smtp_password = smtp_password
self.smtp_server = smtp_server
self.smtp_port = smtp_port
def fill(
self,
subject: str = None,
to: str = None,
smtp_username: str = None,
smtp_password: str = None,
smtp_server: str = None,
smtp_port: int = None,
**_
):
"""Only fill in values that haven't been set yet."""
self.subject = self.subject or subject
self.to = self.to or to
self.smtp_username = self.smtp_username or smtp_username
self.smtp_password = self.smtp_password or smtp_password
self.smtp_server = self.smtp_server or smtp_server
self.smtp_port = self.smtp_port or smtp_port
def load(self, config_path: pathlib.Path):
"""Load config values from `config_path`."""
if not config_path.is_file():
return False
try:
config_text = config_path.read_text()
except PermissionError:
print('Permission to read', config_path, 'was denied.',
file=sys.stderr)
return False
conf_parser = configparser.ConfigParser()
conf_parser.read_string(config_text)
if 'mail' not in conf_parser.sections():
print(config_path, 'is not a valid config file', file=sys.stderr)
return False
self.fill(
**{k.replace('-', '_'): v for k, v in conf_parser['mail'].items()}
)
return True
def __iter__(self) -> iter: # noqa: D105
return iter({
'subject': self.subject,
'to': self.to,
'smtp_username': self.smtp_username,
'smtp_password': self.smtp_password,
'smtp_server': self.smtp_server,
'smtp_port': self.smtp_port
})
def __repr__(self) -> str: # noqa: D105
return (
'<Config '
f'to={self.to}, '
f'smtp_username={self.smtp_username}, '
f'smtp_password={self.smtp_password}, '
f'smtp_server={self.smtp_server}, '
f'smtp_port={self.smtp_port}'
'>'
)
def border(width: int) -> str:
"""Return a single height border `width` characters wide."""
return '-' * width
def double_border(width: int) -> str:
"""Return a double height border `width` characters wide."""
return '\n'.join(['-' * width, f'*{" "*8}' * 9, '-' * width])
@dataclass
class Message:
"""Message formatting wrapper."""
message: mailbox.mboxMessage
"""The message as taken from `mailbox`."""
@property
def width(self) -> int:
"""The maximum width of the message."""
headers_width = max(len(l) for l in self.headers) # noqa: E741
body_width = max(len(l) for l in self.body.split('\n')) # noqa: E741
return max([headers_width, body_width])
@property
def body(self) -> str:
"""The message body/payload as a `str`."""
return self.message.get_payload(decode=True).decode()
@property
def headers(self) -> list[str]:
"""The message headers to be displayed in the email."""
return [
f'Subject: {self.message.get("Subject")}',
f'From: {self.message.get("From")}',
f'To: {self.message.get("To")}',
f'Date: {self.message.get("Date")}'
]
def format(self, width: int) -> str:
"""Return a formatted message with headers."""
return '\n'.join([
'\n'.join(self.headers),
'-' * width,
self.body,
double_border(width),
'\n'
])
def set_mail_header(mail, key, header_key, value):
"""Set a header value in the `mail` object.
Raises:
ConfigError: When the given header is not set and not set in the
message.
"""
if not value and not mail.get(header_key):
raise ConfigError(key, extend=f'Nothing is given for {header_key} '
'header and it is empty.')
if mail.get(header_key):
mail.replace_header(header_key, value)
else:
mail.add_header(header_key, value)
@dataclass
class Report:
"""Report generator."""
config: dict
"""The config as a `dict`."""
user: str = os.getlogin()
"""The system user name. Defaults to the user running this script."""
hostname: str = platform.node()
"""The system hostname. Defaults to the detected hostname of the system
the script is being run on."""
preface: str = ('The following messages for {self.user} were found on '
'{self.hostname}.')
"""The text to show at the top of the email before the messages. It will be
formatted at the time of sending."""
_messages: list[Message] = field(default_factory=list)
"""A `list` of Message objects."""
@property
def width(self) -> int:
"""The maximum width of all messages."""
return max(m.width for m in self._messages)
def append(self, message: mailbox.mboxMessage):
"""Append a message to the report."""
self._messages.append(Message(message))
def get_format_vars(self) -> dict:
"""Return the `dict` of variables for use in formatting the report."""
return {
'user': self.user,
'hostname': self.hostname,
'self': self
}
def send(self):
"""Send the report as an email.
Raises:
ConfigError: When a report email header is not available to be set
and is not already set.
"""
if not self._messages:
return False
subject = f'Local mail for {self.user} from {self.hostname}'
msg = email.message.EmailMessage()
msg.set_content(str(self))
msg.set_default_type('text/plain')
msg.set_type('text/plain')
set_mail_header(msg, 'sender-email', 'From',
f'mbox <{self.config.smtp_username}>')
set_mail_header(msg, 'subject', 'Subject',
self.config.subject or subject)
set_mail_header(msg, 'to', 'To', self.config.to)
msg.set_charset('utf-8')
tls_context = ssl.create_default_context()
with smtplib.SMTP(self.config.smtp_server, self.config.smtp_port) as \
server:
server.ehlo()
server.starttls(context=tls_context)
server.ehlo()
server.login(self.config.smtp_username, self.config.smtp_password)
server.sendmail(self.config.smtp_username, self.config.to,
msg.as_bytes())
return True
def __str__(self): # noqa: D105
width = self.width
report = [self.preface, double_border(len(self.preface))]
for message in self._messages:
report.append(message.format(width))
return '\n'.join(report).format(**self.get_format_vars())
@app.command()
def main( # noqa: D103
mbox: Annotated[List[pathlib.Path], typer.Option(...)] = None,
subject: str = typer.Option(None, # noqa: B008
help='The subject of the outgoing email.'),
to: str = typer.Option(None, # noqa: B008
help='The recipient of the outgoing email.'),
smtp_username: str = typer.Option(None, # noqa: B008
help='The SMTP username. Defaults '
'to the value in the config.'),
smtp_password: str = typer.Option(None, # noqa: B008
help='The SMTP password. Defaults '
'to the value in the config.'),
smtp_server: str = typer.Option(None, help='The SMTP server ' # noqa: B008
'address/hostname. Defaults to the value '
'in the config.'),
smtp_port: int = typer.Option(None, # noqa: B008
help='The SMTP server port number. '
'Defaults to the value in the config.'),
config: pathlib.Path = typer.Option(None, # noqa: B008
help=CONFIG_PATH_HELP)
):
conf = Config(subject, to, smtp_username, smtp_password, smtp_server,
smtp_port)
if config:
conf.load(config)
else:
for conf_path in DEFAULT_CONFIG_PATHS:
if conf.load(conf_path):
break
report = Report(conf)
if not mbox:
print('At least one mbox is required.')
raise typer.Exit(1)
for box in mbox:
for message in mailbox.mbox(box):
report.append(message)
if report.send():
cache_path = user_cache_path('mbox-forward.cache')
cache_path.write_bytes(b'')
for box in mbox:
with cache_path.open('ab') as cache:
cache.write(box.read_bytes())
os.truncate(box, 0)
if __name__ == '__main__':
app()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment