Skip to content

Instantly share code, notes, and snippets.

@masterflitzer
Last active November 26, 2023 20:13
Show Gist options
  • Save masterflitzer/b587c38ee264ec52c664cf4981baf1db to your computer and use it in GitHub Desktop.
Save masterflitzer/b587c38ee264ec52c664cf4981baf1db to your computer and use it in GitHub Desktop.
Send E-Mail from IMAP to multiple recipients over SMTP
#!/usr/bin/env python3
import os
import toml
from email import message_from_bytes, policy
from email.message import EmailMessage
from enum import Enum
from html import escape
from imaplib import IMAP4, IMAP4_SSL
from smtplib import SMTP, SMTP_SSL
from typing import Any, Optional, Union
class EmailSecurity(Enum):
Tls = "tls"
Starttls = "starttls"
Unencrypted = "unencrypted"
def main():
script_dir = os.path.dirname(os.path.abspath(__file__))
config_file = os.path.join(script_dir, "email-multiplexer.toml")
config_data = toml.load(config_file)
config: Optional[dict[str, Any]] = config_data.get("general")
forwarded_text: Optional[str] = config.get("forwarded_text")
recipients: Optional[list[str]] = config.get("recipients")
search_criteria: Optional[str] = config.get("search_criteria")
search_mailbox: Optional[str] = config.get("search_mailbox")
sender: Optional[str] = config.get("sender")
config: dict[str, Any] = config_data.get("imap")
imap_host: Optional[str] = config.get("host")
imap_port: Optional[int] = config.get("port")
imap_user: Optional[str] = config.get("user")
imap_password: Optional[str] = config.get("pass")
imap_security: Optional[EmailSecurity] = email_security_from_str(config.get("security"))
config: dict[str, Any] = config_data.get("smtp")
smtp_host: Optional[str] = config.get("host")
smtp_port: Optional[int] = config.get("port")
smtp_user: Optional[str] = config.get("user")
smtp_password: Optional[str] = config.get("pass")
smtp_security: Optional[EmailSecurity] = email_security_from_str(config.get("security"))
email_policy = policy.SMTPUTF8
if imap_security is None or smtp_security is None:
exit(1)
imap = None
if imap_security == EmailSecurity.Tls:
imap = IMAP4_SSL(imap_host, imap_port)
imap.login(imap_user, imap_password)
elif imap_security == EmailSecurity.Starttls:
imap = IMAP4(imap_host, imap_port)
imap.starttls()
imap.login(imap_user, imap_password)
elif imap_security == EmailSecurity.Unencrypted:
imap = IMAP4(imap_host, imap_port)
imap.login(imap_user, imap_password)
if imap is None:
exit(1)
message_uid = search(imap, search_criteria, search_mailbox)
if message_uid is None:
exit(1)
message_content = fetch(imap, message_uid)
if message_content is None:
exit(1)
mark_as_read(imap, message_uid)
imap.logout()
# noinspection PyTypeChecker
original_message: EmailMessage = message_from_bytes(message_content, policy=email_policy)
original_message_body = original_message.get_body()
original_message_maintype = original_message_body.get_content_maintype()
original_message_subtype = original_message_body.get_content_subtype()
if original_message_maintype == "text" and original_message_subtype in ["plain", "html"]:
original_message_content = original_message_body.get_payload(decode=True)
else:
print('Content-Type of original message is neither "text/plain" nor "text/html"')
exit(1)
message = EmailMessage(policy=email_policy)
message["From"] = sender
message["To"] = ", ".join(recipients)
message["Subject"] = "Fwd: " + original_message["Subject"]
forwarded_headers_text = f"""----------------------------------------------------------------
From: {original_message['From']}
To: {original_message['To']}
Date: {original_message['Date']}
Subject: {original_message['Subject']}
"""
forwarded_headers_html = f"""
<hr>
<b>From:</b> {escape(original_message['From'])}<br>
<b>To:</b> {escape(original_message['To'])}<br>
<b>Date:</b> {escape(original_message['Date'])}<br>
<b>Subject:</b> {escape(original_message['Subject'])}<br>
<br>
"""
payload: list[EmailMessage] = []
message_payload = EmailMessage(policy=email_policy)
message_payload.set_content(forwarded_text, subtype="plain", cte="8bit")
payload.append(message_payload)
message_payload = EmailMessage(policy=email_policy)
message_payload.set_content(forwarded_headers_text, subtype="plain", cte="8bit")
message_payload.add_alternative(forwarded_headers_html, subtype="html", cte="8bit")
payload.append(message_payload)
message_payload = EmailMessage(policy=email_policy)
message_payload.set_content(
original_message_content,
maintype=original_message_maintype,
subtype=original_message_subtype,
cte="base64"
)
payload.append(message_payload)
message.make_mixed()
message.set_payload(payload)
smtp = None
if smtp_security == EmailSecurity.Tls:
smtp = SMTP_SSL(smtp_host, smtp_port)
smtp.login(smtp_user, smtp_password)
elif smtp_security == EmailSecurity.Starttls:
smtp = SMTP(smtp_host, smtp_port)
smtp.starttls()
smtp.login(smtp_user, smtp_password)
elif smtp_security == EmailSecurity.Unencrypted:
smtp = SMTP(smtp_host, smtp_port)
smtp.login(smtp_user, smtp_password)
if smtp is None:
exit(1)
smtp.send_message(message)
smtp.quit()
def email_security_from_str(security: str) -> Optional[EmailSecurity]:
# noinspection PyBroadException
try:
return EmailSecurity(security.lower())
except:
print("Couldn't map email security correctly, check the configuration")
return None
def search(imap: Union[IMAP4_SSL, IMAP4], criteria="ALL", mailbox="INBOX") -> Optional[Any]:
status, responses = imap.select(mailbox, False)
if status == "OK":
status, result = imap.uid("SEARCH", criteria)
if status == "OK":
uids = result[0].split()
if len(uids) > 0:
uid = uids[-1]
print(f"IMAP uid: {uid.decode()}")
return uid
else:
for response in responses:
print(response)
print("IMAP uid search failed")
return None
def fetch(imap: Union[IMAP4_SSL, IMAP4], message_uid) -> Optional[Any]:
status, result = imap.uid("FETCH", message_uid, "(RFC822)")
if status == "OK":
content = result[0][1]
if content is not None:
return content
print("IMAP fetch failed")
return None
def mark_as_read(imap: Union[IMAP4_SSL, IMAP4], message_uid) -> None:
status, result = imap.uid("STORE", message_uid, "+FLAGS", "(\SEEN)")
if status == "OK":
print("Successfully marked as read")
else:
print("Failed to mark as read")
if __name__ == "__main__":
main()
[general]
forwarded_text = "Forwarded by email-multiplexer"
recipients = [""]
search_criteria = '(UNSEEN SUBJECT "Zugangscode" FROM "[email protected]")'
search_mailbox = "INBOX"
sender = ""
[imap]
host = ""
port = 993
user = ""
pass = ""
# "tls" / "starttls" / "unencrypted"
security = "tls"
[smtp]
host = ""
port = 465
user = ""
pass = ""
# "tls" / "starttls" / "unencrypted"
security = "tls"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment