Created
May 16, 2024 17:16
-
-
Save mohllal/ae99252a79bda4af30f16630e49f72ea to your computer and use it in GitHub Desktop.
A custom SMTP transport for the marrow/mailer python package
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
# Fix to marrow/mailer issue: https://github.com/marrow/mailer/issues/87#issuecomment-689586587 | |
sys.modules["cgi.parse_qsl"] = None | |
from smtplib import SMTP, SMTP_SSL | |
from marrow.mailer import Mailer | |
from marrow.mailer.exc import TransportException | |
from marrow.mailer.transport.smtp import SMTPTransport | |
class CustomSMTPTransport(SMTPTransport): | |
""" | |
Workaround for the marrow/mailer issue: https://github.com/marrow/mailer/issues/83 | |
The issue prevents sending emails using TLS negotiation. | |
It happens when the SMTP instance attempts to connect without specifying the 'host' and 'port' parameters to the constructors for the SMTP/SMTP_SSL classes. | |
More context is on the python/cpython issue: https://github.com/python/cpython/issues/80275 | |
""" | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.timeout = 10 | |
def connect_to_server(self): | |
if self.tls == "ssl": # pragma: no cover | |
connection = SMTP_SSL( | |
local_hostname=self.local_hostname, | |
keyfile=self.keyfile, | |
certfile=self.certfile, | |
timeout=self.timeout, | |
host=self.host, | |
port=self.port, | |
) | |
else: | |
connection = SMTP(local_hostname=self.local_hostname, timeout=self.timeout, host=self.host, port=self.port) | |
connection.set_debuglevel(self.debug) | |
connection.connect(self.host, self.port) | |
# Do TLS handshake if configured | |
connection.ehlo() | |
if self.tls in ("required", "optional", True): | |
if connection.has_extn("STARTTLS"): # pragma: no cover | |
connection.starttls(self.keyfile, self.certfile) | |
elif self.tls == "required": | |
raise TransportException("TLS is required but not available on the server -- aborting") | |
# Authenticate to server if necessary | |
if self.username and self.password: | |
connection.login(self.username, self.password) | |
self.connection = connection | |
self.sent = 0 | |
smtp_client = Mailer( | |
{ | |
"manager.use": "immediate", | |
"transport.use": CustomSMTPTransport, | |
"transport.host": "<smtp_server_host>", | |
"transport.port": "<smtp_server_port>", | |
"transport.tls": "required", | |
"transport.username": "<smtp_server_username>", | |
"transport.password": "<smtp_server_password>", | |
"transport.debug": True, | |
} | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment