Last active
January 27, 2022 09:21
-
-
Save jn0/4d5046c9decbba0292eaff3c80289605 to your computer and use it in GitHub Desktop.
Simple SMTP Mail Sender
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
Run as `echo MESSAGE | smail.py smail.yaml [email protected] -s "Note: warning"` | |
where `smail.yaml` looks like this: | |
---------------------------------- >8 ----------------------------------------- | |
mail: | |
host: smtp.mail.net | |
sender: [email protected] | |
password: !secret mail_password | |
name: Bot #1 | |
port: 587 | |
timeout: 15 | |
encryption: starttls | |
logging: !include smail.logging.yaml | |
---------------------------------- >8 ----------------------------------------- | |
Add `smail.secret.yaml` next to the `smail.yaml` (and | |
`chmod g=,o= *secret.yaml` it -- "install" will take care of it) and put the | |
secrets there: | |
---------------------------------- >8 ----------------------------------------- | |
mail_password: SuPeRsEcReTpAsSw0rD | |
---------------------------------- >8 ----------------------------------------- | |
Add quite common python logging config to `smail.logging.yaml` next to the | |
`smail.yaml` file like that one: | |
---------------------------------- >8 ----------------------------------------- | |
version: 1 | |
incremental: false | |
disable_existing_loggers: false | |
formatters: | |
short: | |
format: '%(asctime)s %(name)s:%(levelname)s %(message)s' | |
filters: {} | |
handlers: | |
standard: | |
level: INFO | |
formatter: short | |
class: logging.StreamHandler | |
stream: ext://sys.stderr | |
root: | |
level: INFO | |
handlers: [standard] | |
loggers: {} | |
---------------------------------- >8 ----------------------------------------- | |
''' | |
import logging; log = logging.getLogger('smail') # noqa:E702 | |
import sys | |
import os | |
import stat | |
import re | |
import argparse | |
import yaml | |
import smtplib | |
# ----------------------------------------------------------------------------- | |
INIT_CONFIG = { | |
"mail": { | |
"host": "smtp.mail.net", | |
"sender": "[email protected]", | |
"password": "!secret mail_password", | |
"name": "Bot #1", | |
"port": 587, | |
"timeout": 15, | |
"encryption": "starttls", | |
}, | |
"logging": "!include smail.logging.yaml", | |
} | |
INIT_SECRET_CONFIG = { | |
"mail_password": "SuPeRsEcReTpAsSw0rD" | |
} | |
INIT_LOG_CONFIG = { | |
"version": 1, | |
"incremental": False, | |
"disable_existing_loggers": False, | |
"formatters": { | |
"short": { | |
"format": "%(asctime)s %(levelname)s %(message)s", | |
} | |
}, | |
"filters": {}, | |
"handlers": { | |
"standard": { | |
"level": "INFO", | |
"formatter": "short", | |
"class": "logging.StreamHandler", | |
"stream": "ext://sys.stderr", | |
} | |
}, | |
"root": { | |
"level": "INFO", | |
"handlers": ["standard"], | |
}, | |
"loggers": {} | |
} | |
# ----------------------------------------------------------------------------- | |
# import yaml, os, stat | |
class MyLoader(yaml.SafeLoader): | |
@classmethod | |
def load(kls, source, **kw): | |
if isinstance(source, str): | |
with open(source) as f: | |
return yaml.load(f, kls, **kw) | |
assert hasattr(source, 'read'), repr(source) | |
return yaml.load(source, kls, **kw) | |
def __init__(self, stream): | |
self._file = stream.name | |
self._root = os.path.dirname(self._file) | |
self._secret = None | |
super().__init__(stream) | |
def secret(self, node): | |
if self._secret is None: | |
filename = os.path.splitext(self._file)[0] + '.secret.yaml' | |
self._secret = self.load(filename) | |
return self._secret.get(self.construct_scalar(node)) | |
def include(self, node): | |
name = self.construct_scalar(node) | |
filename = name if os.path.isabs(name) else \ | |
os.path.join(self._root, name) | |
return self.load(filename) | |
MyLoader.add_constructor('!include', MyLoader.include) | |
MyLoader.add_constructor('!secret', MyLoader.secret) | |
SAFE_MASK = ~(stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | | |
stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH) | |
def create_config(name, config, mask=None): | |
import io | |
with io.StringIO() as f: | |
yaml.safe_dump(config, f, indent=2, allow_unicode=True) | |
text = f.getvalue().splitlines() | |
with open(name, 'wt') as f: | |
for line in text: | |
if line and ": '!" in line: | |
tag, val = line.split(':', 1) | |
val = val.strip().strip("'") | |
line = tag + ': ' + val | |
f.write(line + '\n') | |
if mask: | |
st = os.stat(name) | |
mode = st.st_mode & mask | |
os.chmod(name, mode) | |
# ----------------------------------------------------------------------------- | |
# import smtplib | |
class LoggingSMTP(smtplib.SMTP): | |
logger = log.info | |
def _print_debug(self, *args): | |
''' | |
As it's the logger's business to prepend stamps, the debuglevel | |
exact value is now ignored. | |
''' | |
if self.debuglevel: | |
LoggingSMTP.logger('%s', ' '.join(str(a) for a in args)) | |
# ----------------------------------------------------------------------------- | |
# import argparse, os, sys | |
class ARG(argparse.ArgumentParser): | |
def add(self, *av, **kw): | |
self.add_argument(*av, **kw) | |
return self | |
def parse(self): | |
from logging.config import dictConfig | |
args = self.parse_args() | |
if args.recepient == 'install': | |
name = os.path.splitext(os.path.basename(args.config))[0] | |
INIT_CONFIG['logging'] = INIT_CONFIG['logging'].replace('smail', name) # noqa:E501 | |
logcf = INIT_CONFIG['logging'].split()[1] | |
secret = os.path.splitext(args.config)[0] + '.secret.yaml' | |
for f in (args.config, logcf, secret): | |
if os.path.exists(f): | |
log.error('Already installed %r', f) | |
sys.exit(1) | |
create_config(args.config, INIT_CONFIG) | |
create_config(secret, INIT_SECRET_CONFIG, SAFE_MASK) | |
create_config(logcf, INIT_LOG_CONFIG) | |
log.warning('Please edit these files: %r, %r, and %r', | |
args.config, logcf, secret) | |
sys.exit(0) | |
args.config = MyLoader.load(args.config) | |
if args.config.get('logging'): | |
dictConfig(args.config['logging']) | |
if args.log_level is not None: | |
logging.root.setLevel(args.log_level.upper()) | |
log.debug('Log level set to %r', args.log_level.upper()) | |
if args.subject is None: | |
args.subject = args.config.get('mail', {}).get('name') | |
if args.text is None or args.text == '-': | |
text = sys.stdin.read() | |
args.text = text.decode() if isinstance(text, bytes) else text | |
return args | |
arg = ARG( | |
description='Simple SMTP Mail Sender', | |
).add( | |
dest='config', | |
help='YAML config file', | |
).add( | |
dest='recepient', | |
help='Target e-mail address or' | |
' the "install" word (to install sample config).', | |
).add( | |
'-s', '--subject', | |
action='store', | |
help='Mail subject' | |
).add( | |
'--html', | |
action='store_true', default=False, | |
help='Mail text is HTML.' | |
).add( | |
'--log-level', | |
action='store', | |
help='Set log level.' | |
).add( | |
'--dry-run', | |
action='store_true', default=False, | |
help='Do not send mail.' | |
).add( | |
'-t', '--text', | |
action='store', | |
help='Mail text. Omit or specify dash `-` to read STDIN.' | |
).parse() | |
# ----------------------------------------------------------------------------- | |
# import re | |
TAG_2_NL = re.compile(r'(<p\s+[^>]*>)|(<table\s+[^>]*>)|(<table>)|(<p>)|' | |
r'(</p>)|(</tr>)|(</table>)', re.I) | |
X_TAG = re.compile(r'</?[a-z]+[^>]*>', re.I) | |
NLS = re.compile(r'[\n]{2,}') | |
def html2text(html): | |
text = ' '.join(html.split()) | |
text = re.sub(NLS, '\n\n', | |
re.sub(X_TAG, '', | |
re.sub(TAG_2_NL, '\n', text) | |
.replace('</th>', ' ').replace('</TH>', ' ') | |
) | |
) | |
return text | |
# ----------------------------------------------------------------------------- | |
def make_mail(to, cf, subj, text): | |
# https://stackoverflow.com/a/882770/1278896 | |
from email.message import EmailMessage | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.text import MIMEText | |
msg = MIMEMultipart('alternative') if arg.html else EmailMessage() | |
frm = f'{cf["name"]} <{cf["sender"]}>' | |
msg.set_charset('utf-8') | |
msg.set_unixfrom(frm) | |
msg.add_header('To', to) | |
msg.add_header('From', frm) | |
msg.add_header('Subject', subj) | |
if arg.html: | |
msg.attach(MIMEText(html2text(text), 'plain', 'utf-8')) | |
msg.attach(MIMEText(text, 'html', 'utf-8')) | |
else: | |
msg.set_content(text, charset='utf-8', cte='8bit') | |
return msg | |
def send_mail(config, message, to, debug=0): | |
import ssl | |
mode = config.get('encryption', '').strip().lower() | |
host = config.get('host', 'localhost') | |
port = config.get('port', 587 if mode == 'starttls' else 25) | |
timeout = config.get('timeout', 10) | |
sndr = config.get('sender') | |
context = ssl.SSLContext(ssl.PROTOCOL_TLS) if mode == 'starttls' else None | |
log.debug('About to send mail to %r:%r (%s) from %r', | |
host, port, mode.upper() or 'PLAIN', sndr) | |
if arg.dry_run: | |
log.warning('MAIL NOT SENT IN DRY RUN') | |
return | |
with LoggingSMTP(host=host, port=port, timeout=timeout) as smtp: | |
smtp.set_debuglevel(debug) | |
smtp.ehlo() | |
if mode == 'starttls': | |
smtp.starttls(context=context) | |
smtp.ehlo() | |
if config.get('password'): | |
smtp.login(config['sender'], config['password']) | |
smtp.send_message(message, sndr, [to]) | |
smtp.quit() | |
# ----------------------------------------------------------------------------- | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO) | |
try: | |
log.info('start') | |
for n in ('config', 'recepient', 'subject', 'text', 'config'): | |
v = getattr(arg, n) | |
log.debug('%r: %r', n, v) | |
mail_config = arg.config['mail'] | |
message = make_mail(arg.recepient, mail_config, arg.subject, arg.text) | |
send_mail( | |
config=mail_config, | |
message=message, | |
to=arg.recepient, | |
debug=0 if log.level >= logging.INFO else 1, | |
) | |
finally: | |
log.info('stop') | |
# vim:set ft=python ai et ts=4 sts=4 sw=4 cc=80:EOF # |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The help:
The idea:
Initial deploy: