Last active
November 26, 2023 20:13
-
-
Save masterflitzer/b587c38ee264ec52c664cf4981baf1db to your computer and use it in GitHub Desktop.
Send E-Mail from IMAP to multiple recipients over SMTP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import toml | |
from email import message_from_bytes, policy | |
from email.message import EmailMessage | |
from enum import Enum | |
from html import escape | |
from imaplib import IMAP4, IMAP4_SSL | |
from smtplib import SMTP, SMTP_SSL | |
from typing import Any, Optional, Union | |
class EmailSecurity(Enum): | |
Tls = "tls" | |
Starttls = "starttls" | |
Unencrypted = "unencrypted" | |
def main(): | |
script_dir = os.path.dirname(os.path.abspath(__file__)) | |
config_file = os.path.join(script_dir, "email-multiplexer.toml") | |
config_data = toml.load(config_file) | |
config: Optional[dict[str, Any]] = config_data.get("general") | |
forwarded_text: Optional[str] = config.get("forwarded_text") | |
recipients: Optional[list[str]] = config.get("recipients") | |
search_criteria: Optional[str] = config.get("search_criteria") | |
search_mailbox: Optional[str] = config.get("search_mailbox") | |
sender: Optional[str] = config.get("sender") | |
config: dict[str, Any] = config_data.get("imap") | |
imap_host: Optional[str] = config.get("host") | |
imap_port: Optional[int] = config.get("port") | |
imap_user: Optional[str] = config.get("user") | |
imap_password: Optional[str] = config.get("pass") | |
imap_security: Optional[EmailSecurity] = email_security_from_str(config.get("security")) | |
config: dict[str, Any] = config_data.get("smtp") | |
smtp_host: Optional[str] = config.get("host") | |
smtp_port: Optional[int] = config.get("port") | |
smtp_user: Optional[str] = config.get("user") | |
smtp_password: Optional[str] = config.get("pass") | |
smtp_security: Optional[EmailSecurity] = email_security_from_str(config.get("security")) | |
email_policy = policy.SMTPUTF8 | |
if imap_security is None or smtp_security is None: | |
exit(1) | |
imap = None | |
if imap_security == EmailSecurity.Tls: | |
imap = IMAP4_SSL(imap_host, imap_port) | |
imap.login(imap_user, imap_password) | |
elif imap_security == EmailSecurity.Starttls: | |
imap = IMAP4(imap_host, imap_port) | |
imap.starttls() | |
imap.login(imap_user, imap_password) | |
elif imap_security == EmailSecurity.Unencrypted: | |
imap = IMAP4(imap_host, imap_port) | |
imap.login(imap_user, imap_password) | |
if imap is None: | |
exit(1) | |
message_uid = search(imap, search_criteria, search_mailbox) | |
if message_uid is None: | |
exit(1) | |
message_content = fetch(imap, message_uid) | |
if message_content is None: | |
exit(1) | |
mark_as_read(imap, message_uid) | |
imap.logout() | |
# noinspection PyTypeChecker | |
original_message: EmailMessage = message_from_bytes(message_content, policy=email_policy) | |
original_message_body = original_message.get_body() | |
original_message_maintype = original_message_body.get_content_maintype() | |
original_message_subtype = original_message_body.get_content_subtype() | |
if original_message_maintype == "text" and original_message_subtype in ["plain", "html"]: | |
original_message_content = original_message_body.get_payload(decode=True) | |
else: | |
print('Content-Type of original message is neither "text/plain" nor "text/html"') | |
exit(1) | |
message = EmailMessage(policy=email_policy) | |
message["From"] = sender | |
message["To"] = ", ".join(recipients) | |
message["Subject"] = "Fwd: " + original_message["Subject"] | |
forwarded_headers_text = f"""---------------------------------------------------------------- | |
From: {original_message['From']} | |
To: {original_message['To']} | |
Date: {original_message['Date']} | |
Subject: {original_message['Subject']} | |
""" | |
forwarded_headers_html = f""" | |
<hr> | |
<b>From:</b> {escape(original_message['From'])}<br> | |
<b>To:</b> {escape(original_message['To'])}<br> | |
<b>Date:</b> {escape(original_message['Date'])}<br> | |
<b>Subject:</b> {escape(original_message['Subject'])}<br> | |
<br> | |
""" | |
payload: list[EmailMessage] = [] | |
message_payload = EmailMessage(policy=email_policy) | |
message_payload.set_content(forwarded_text, subtype="plain", cte="8bit") | |
payload.append(message_payload) | |
message_payload = EmailMessage(policy=email_policy) | |
message_payload.set_content(forwarded_headers_text, subtype="plain", cte="8bit") | |
message_payload.add_alternative(forwarded_headers_html, subtype="html", cte="8bit") | |
payload.append(message_payload) | |
message_payload = EmailMessage(policy=email_policy) | |
message_payload.set_content( | |
original_message_content, | |
maintype=original_message_maintype, | |
subtype=original_message_subtype, | |
cte="base64" | |
) | |
payload.append(message_payload) | |
message.make_mixed() | |
message.set_payload(payload) | |
smtp = None | |
if smtp_security == EmailSecurity.Tls: | |
smtp = SMTP_SSL(smtp_host, smtp_port) | |
smtp.login(smtp_user, smtp_password) | |
elif smtp_security == EmailSecurity.Starttls: | |
smtp = SMTP(smtp_host, smtp_port) | |
smtp.starttls() | |
smtp.login(smtp_user, smtp_password) | |
elif smtp_security == EmailSecurity.Unencrypted: | |
smtp = SMTP(smtp_host, smtp_port) | |
smtp.login(smtp_user, smtp_password) | |
if smtp is None: | |
exit(1) | |
smtp.send_message(message) | |
smtp.quit() | |
def email_security_from_str(security: str) -> Optional[EmailSecurity]: | |
# noinspection PyBroadException | |
try: | |
return EmailSecurity(security.lower()) | |
except: | |
print("Couldn't map email security correctly, check the configuration") | |
return None | |
def search(imap: Union[IMAP4_SSL, IMAP4], criteria="ALL", mailbox="INBOX") -> Optional[Any]: | |
status, responses = imap.select(mailbox, False) | |
if status == "OK": | |
status, result = imap.uid("SEARCH", criteria) | |
if status == "OK": | |
uids = result[0].split() | |
if len(uids) > 0: | |
uid = uids[-1] | |
print(f"IMAP uid: {uid.decode()}") | |
return uid | |
else: | |
for response in responses: | |
print(response) | |
print("IMAP uid search failed") | |
return None | |
def fetch(imap: Union[IMAP4_SSL, IMAP4], message_uid) -> Optional[Any]: | |
status, result = imap.uid("FETCH", message_uid, "(RFC822)") | |
if status == "OK": | |
content = result[0][1] | |
if content is not None: | |
return content | |
print("IMAP fetch failed") | |
return None | |
def mark_as_read(imap: Union[IMAP4_SSL, IMAP4], message_uid) -> None: | |
status, result = imap.uid("STORE", message_uid, "+FLAGS", "(\SEEN)") | |
if status == "OK": | |
print("Successfully marked as read") | |
else: | |
print("Failed to mark as read") | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[general] | |
forwarded_text = "Forwarded by email-multiplexer" | |
recipients = [""] | |
search_criteria = '(UNSEEN SUBJECT "Zugangscode" FROM "[email protected]")' | |
search_mailbox = "INBOX" | |
sender = "" | |
[imap] | |
host = "" | |
port = 993 | |
user = "" | |
pass = "" | |
# "tls" / "starttls" / "unencrypted" | |
security = "tls" | |
[smtp] | |
host = "" | |
port = 465 | |
user = "" | |
pass = "" | |
# "tls" / "starttls" / "unencrypted" | |
security = "tls" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment