Last active
December 3, 2025 13:27
-
-
Save mistivia/1536450a603530f46cb3f8c56e789753 to your computer and use it in GitHub Desktop.
Turn any imap/smtp email service into a mailing list
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import imaplib | |
| import smtplib | |
| import time | |
| import os | |
| import email | |
| from email.header import decode_header | |
| from email.utils import parseaddr | |
| from email.mime.text import MIMEText | |
| from email.header import Header | |
| import json | |
| subscribe_help_text = """Hello, | |
| You have not subscribed to this mailing list yet. | |
| To subscribe, send a email with SUBJECT 'subscribe' to me. | |
| To unsubscribe, send a email with SUBJECT 'unsubscribe' to me. | |
| 你好: | |
| 你还没有订阅本邮件列表。 | |
| 想要订阅的话,再发一封邮件给我,主题设置为“subscribe”。 | |
| 如果想取消订阅的话,发一封邮件给我,主题设置为“unsubscribe”。 | |
| """ | |
| config = {} | |
| with open('config.json', 'r') as fp: | |
| config = json.load(fp) | |
| LIST_NAME = config['name'] | |
| LIST_EMAIL = config['address'] | |
| LIST_PASSWORD = config['password'] | |
| IMAP_SERVER = config['imap-server'] | |
| IMAP_PORT = config['imap-port'] | |
| SMTP_SERVER = config['smtp-server'] | |
| SMTP_PORT = config['smtp-port'] | |
| SUBSCRIBER_FILE = config['subscribers'] | |
| CHECK_INTERVAL = 15 | |
| def get_subscribers(): | |
| if not os.path.exists(SUBSCRIBER_FILE): | |
| return set() | |
| with open(SUBSCRIBER_FILE, 'r', encoding='utf-8') as f: | |
| return set(line.strip() for line in f if line.strip()) | |
| def save_subscriber(email_addr, action='add'): | |
| subs = get_subscribers() | |
| if action == 'add': | |
| subs.add(email_addr) | |
| print(f"[Info] Added subscriber: {email_addr}") | |
| elif action == 'remove': | |
| if email_addr in subs: | |
| subs.remove(email_addr) | |
| print(f"[Info] Removed subscriber: {email_addr}") | |
| with open(SUBSCRIBER_FILE, 'w', encoding='utf-8') as f: | |
| for sub in subs: | |
| f.write(f"{sub}\n") | |
| def decode_str(s): | |
| if s is None: | |
| return "" | |
| value, charset = decode_header(s)[0] | |
| if charset: | |
| try: | |
| return value.decode(charset) | |
| except: | |
| return value | |
| if isinstance(value, bytes): | |
| try: | |
| return value.decode('utf-8') | |
| except: | |
| return str(value) | |
| return str(value) | |
| def send_plain_text_email(receiver_email, subject, body): | |
| msg = MIMEText(body, 'plain', 'utf-8') | |
| msg['From'] = Header(f"{LIST_NAME} <{LIST_EMAIL}>", 'utf-8') | |
| msg['To'] = Header(receiver_email, 'utf-8') | |
| msg['Subject'] = Header(subject, 'utf-8') | |
| server = None | |
| try: | |
| server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT) | |
| server.login(LIST_EMAIL, LIST_PASSWORD) | |
| server.sendmail(LIST_EMAIL, [receiver_email], msg.as_string()) | |
| return "Email sent successfully!" | |
| except smtplib.SMTPException as e: | |
| return f"SMTP error occurred: {e}" | |
| except Exception as e: | |
| return f"An error occurred: {e}" | |
| finally: | |
| if server: | |
| server.quit() | |
| def send_email(recipients, msg): | |
| if not recipients: | |
| return | |
| try: | |
| print(f"[Sending] Starting bulk send to {len(recipients)} recipients...") | |
| server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT) | |
| server.login(LIST_EMAIL, LIST_PASSWORD) | |
| for recipient in recipients: | |
| try: | |
| if 'To' in msg: | |
| del msg['To'] | |
| msg['To'] = recipient | |
| server.sendmail(LIST_EMAIL, [recipient], msg.as_string()) | |
| print(f" -> Sent to: {recipient}") | |
| except Exception as e_inner: | |
| print(f" -> Failed sending to {recipient}: {e_inner}") | |
| server.quit() | |
| print("[Success] All emails processed.") | |
| except Exception as e: | |
| print(f"[Error] SMTP Connection failed: {e}") | |
| def process_email(msg): | |
| from_header = msg.get("From") | |
| sender_name, sender_email = parseaddr(from_header) | |
| sender_name = decode_str(sender_name) | |
| sender_email = sender_email.lower() | |
| subject = decode_str(msg.get("Subject")) | |
| print(f"[Received] From: {sender_email}, Subject: {subject}") | |
| clean_subject = subject.strip().lower() | |
| if clean_subject == 'subscribe': | |
| save_subscriber(sender_email, 'add') | |
| send_plain_text_email(sender_email, 'Subscribed', 'Subscribed.\n\nTo unsubscribe, send a email with SUBJECT \'unsubscribe\' to me.\n') | |
| return 'delete' | |
| elif clean_subject == 'ping': | |
| send_plain_text_email(sender_email, 'PONG', 'PONG') | |
| return 'delete' | |
| else: | |
| subscribers = get_subscribers() | |
| if sender_email not in subscribers: | |
| send_plain_text_email(sender_email, 'Mailing List Help', subscribe_help_text) | |
| return 'delete' | |
| if clean_subject == 'unsubscribe': | |
| save_subscriber(sender_email, 'remove') | |
| send_plain_text_email(sender_email, 'Unsubscribed', 'Unsubscribed') | |
| return 'delete' | |
| else: | |
| recipients = subscribers | |
| if not recipients: | |
| print("[Info] No recipients to forward to.") | |
| return 'archive' | |
| new_from = f"{sender_name} ({sender_email}) via {LIST_NAME} <{LIST_EMAIL}>" | |
| if 'From' in msg: | |
| msg.replace_header('From', new_from) | |
| else: | |
| msg.add_header('From', new_from) | |
| send_email(recipients, msg) | |
| return 'archive' | |
| def ensure_archive_folder_exists(mail, folder_name): | |
| print(f"Checking folder '{folder_name}'...") | |
| status, folders = mail.list() | |
| if status != 'OK': | |
| print(f"Failed to retrieve folder list: {folders}") | |
| return False | |
| folder_exists = any(folder_name.encode() in f for f in folders) | |
| if folder_exists: | |
| print(f"Folder '{folder_name}' already exists.") | |
| return True | |
| else: | |
| print(f"Folder '{folder_name}' does not exist, creating...") | |
| status, response = mail.create(folder_name) | |
| if status == 'OK': | |
| print(f"Folder '{folder_name}' created successfully.") | |
| return True | |
| else: | |
| print(f"Failed to create folder '{folder_name}': {response}") | |
| return False | |
| def archive_mail(mail, uid): | |
| source_folder = 'inbox' | |
| target_folder = 'archive' | |
| if not uid: | |
| print("No mail UID to archive.") | |
| return | |
| status, response = mail.select(source_folder) | |
| if status != 'OK': | |
| print(f"Failed to select source mailbox '{source_folder}': {response}") | |
| return | |
| uid_string = uid | |
| print(f"\nStarting archive for UID: {uid_string.decode()} to '{target_folder}'...") | |
| status, response = mail.uid('COPY', uid_string, target_folder) | |
| if status != 'OK': | |
| print(f"Mail copy failed! IMAP server returned: {status} - {response}") | |
| return | |
| print("Mail copied successfully, marking source mail for deletion...") | |
| status, response = mail.uid('STORE', uid_string, '+FLAGS.SILENT', '\\Deleted') | |
| if status != 'OK': | |
| print(f"Failed to mark for deletion! Please check source folder manually.") | |
| return | |
| def main(): | |
| print("Starting Mailing List System...") | |
| mail = imaplib.IMAP4(IMAP_SERVER, IMAP_PORT) | |
| mail.login(LIST_EMAIL, LIST_PASSWORD) | |
| ensure_archive_folder_exists(mail, 'archive') | |
| mail.logout() | |
| while True: | |
| try: | |
| mail = imaplib.IMAP4(IMAP_SERVER, IMAP_PORT) | |
| mail.login(LIST_EMAIL, LIST_PASSWORD) | |
| mail.select('inbox') | |
| status, messages = mail.search(None, 'UNSEEN') | |
| expunge_flag = False | |
| if status == 'OK': | |
| email_ids = messages[0].split() | |
| if email_ids: | |
| print(f"Found {len(email_ids)} new emails.") | |
| else: | |
| print("No email found.") | |
| for num in email_ids: | |
| res, msg_data = mail.fetch(num, '(RFC822)') | |
| raw_email = msg_data[0][1] | |
| msg = email.message_from_bytes(raw_email) | |
| action = process_email(msg) | |
| print("msg processed, post-process action: " + action) | |
| if action == 'archive': | |
| archive_mail(mail, num) | |
| expunge_flag = True | |
| elif action == 'delete': | |
| mail.uid('STORE', num, '+FLAGS', '\\Deleted') | |
| expunge_flag = True | |
| if expunge_flag: | |
| status, repsonse = mail.expunge() | |
| if status == 'OK': | |
| print('EXPUNGE success') | |
| else: | |
| print('EXPUNGE failed') | |
| mail.close() | |
| mail.logout() | |
| except Exception as e: | |
| print(f"[Error] Main loop error: {e}") | |
| time.sleep(CHECK_INTERVAL) | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "name": "Simple Mailing List", | |
| "address": "[email protected]", | |
| "password": "YOUR_PASSWORD", | |
| "imap-server": "imap.example.org", | |
| "imap-port": 143, | |
| "smtp-server": "smtp.example.org", | |
| "smtp-port": 25, | |
| "subscribers": "subscribers.txt" | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment