Skip to content

Instantly share code, notes, and snippets.

@mistivia
Last active December 3, 2025 13:27
Show Gist options
  • Select an option

  • Save mistivia/1536450a603530f46cb3f8c56e789753 to your computer and use it in GitHub Desktop.

Select an option

Save mistivia/1536450a603530f46cb3f8c56e789753 to your computer and use it in GitHub Desktop.
Turn any imap/smtp email service into a mailing list
import imaplib
import smtplib
import time
import os
import email
from email.header import decode_header
from email.utils import parseaddr
from email.mime.text import MIMEText
from email.header import Header
import json
subscribe_help_text = """Hello,
You have not subscribed to this mailing list yet.
To subscribe, send a email with SUBJECT 'subscribe' to me.
To unsubscribe, send a email with SUBJECT 'unsubscribe' to me.
你好:
你还没有订阅本邮件列表。
想要订阅的话,再发一封邮件给我,主题设置为“subscribe”。
如果想取消订阅的话,发一封邮件给我,主题设置为“unsubscribe”。
"""
config = {}
with open('config.json', 'r') as fp:
config = json.load(fp)
LIST_NAME = config['name']
LIST_EMAIL = config['address']
LIST_PASSWORD = config['password']
IMAP_SERVER = config['imap-server']
IMAP_PORT = config['imap-port']
SMTP_SERVER = config['smtp-server']
SMTP_PORT = config['smtp-port']
SUBSCRIBER_FILE = config['subscribers']
CHECK_INTERVAL = 15
def get_subscribers():
if not os.path.exists(SUBSCRIBER_FILE):
return set()
with open(SUBSCRIBER_FILE, 'r', encoding='utf-8') as f:
return set(line.strip() for line in f if line.strip())
def save_subscriber(email_addr, action='add'):
subs = get_subscribers()
if action == 'add':
subs.add(email_addr)
print(f"[Info] Added subscriber: {email_addr}")
elif action == 'remove':
if email_addr in subs:
subs.remove(email_addr)
print(f"[Info] Removed subscriber: {email_addr}")
with open(SUBSCRIBER_FILE, 'w', encoding='utf-8') as f:
for sub in subs:
f.write(f"{sub}\n")
def decode_str(s):
if s is None:
return ""
value, charset = decode_header(s)[0]
if charset:
try:
return value.decode(charset)
except:
return value
if isinstance(value, bytes):
try:
return value.decode('utf-8')
except:
return str(value)
return str(value)
def send_plain_text_email(receiver_email, subject, body):
msg = MIMEText(body, 'plain', 'utf-8')
msg['From'] = Header(f"{LIST_NAME} <{LIST_EMAIL}>", 'utf-8')
msg['To'] = Header(receiver_email, 'utf-8')
msg['Subject'] = Header(subject, 'utf-8')
server = None
try:
server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
server.login(LIST_EMAIL, LIST_PASSWORD)
server.sendmail(LIST_EMAIL, [receiver_email], msg.as_string())
return "Email sent successfully!"
except smtplib.SMTPException as e:
return f"SMTP error occurred: {e}"
except Exception as e:
return f"An error occurred: {e}"
finally:
if server:
server.quit()
def send_email(recipients, msg):
if not recipients:
return
try:
print(f"[Sending] Starting bulk send to {len(recipients)} recipients...")
server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
server.login(LIST_EMAIL, LIST_PASSWORD)
for recipient in recipients:
try:
if 'To' in msg:
del msg['To']
msg['To'] = recipient
server.sendmail(LIST_EMAIL, [recipient], msg.as_string())
print(f" -> Sent to: {recipient}")
except Exception as e_inner:
print(f" -> Failed sending to {recipient}: {e_inner}")
server.quit()
print("[Success] All emails processed.")
except Exception as e:
print(f"[Error] SMTP Connection failed: {e}")
def process_email(msg):
from_header = msg.get("From")
sender_name, sender_email = parseaddr(from_header)
sender_name = decode_str(sender_name)
sender_email = sender_email.lower()
subject = decode_str(msg.get("Subject"))
print(f"[Received] From: {sender_email}, Subject: {subject}")
clean_subject = subject.strip().lower()
if clean_subject == 'subscribe':
save_subscriber(sender_email, 'add')
send_plain_text_email(sender_email, 'Subscribed', 'Subscribed.\n\nTo unsubscribe, send a email with SUBJECT \'unsubscribe\' to me.\n')
return 'delete'
elif clean_subject == 'ping':
send_plain_text_email(sender_email, 'PONG', 'PONG')
return 'delete'
else:
subscribers = get_subscribers()
if sender_email not in subscribers:
send_plain_text_email(sender_email, 'Mailing List Help', subscribe_help_text)
return 'delete'
if clean_subject == 'unsubscribe':
save_subscriber(sender_email, 'remove')
send_plain_text_email(sender_email, 'Unsubscribed', 'Unsubscribed')
return 'delete'
else:
recipients = subscribers
if not recipients:
print("[Info] No recipients to forward to.")
return 'archive'
new_from = f"{sender_name} ({sender_email}) via {LIST_NAME} <{LIST_EMAIL}>"
if 'From' in msg:
msg.replace_header('From', new_from)
else:
msg.add_header('From', new_from)
send_email(recipients, msg)
return 'archive'
def ensure_archive_folder_exists(mail, folder_name):
print(f"Checking folder '{folder_name}'...")
status, folders = mail.list()
if status != 'OK':
print(f"Failed to retrieve folder list: {folders}")
return False
folder_exists = any(folder_name.encode() in f for f in folders)
if folder_exists:
print(f"Folder '{folder_name}' already exists.")
return True
else:
print(f"Folder '{folder_name}' does not exist, creating...")
status, response = mail.create(folder_name)
if status == 'OK':
print(f"Folder '{folder_name}' created successfully.")
return True
else:
print(f"Failed to create folder '{folder_name}': {response}")
return False
def archive_mail(mail, uid):
source_folder = 'inbox'
target_folder = 'archive'
if not uid:
print("No mail UID to archive.")
return
status, response = mail.select(source_folder)
if status != 'OK':
print(f"Failed to select source mailbox '{source_folder}': {response}")
return
uid_string = uid
print(f"\nStarting archive for UID: {uid_string.decode()} to '{target_folder}'...")
status, response = mail.uid('COPY', uid_string, target_folder)
if status != 'OK':
print(f"Mail copy failed! IMAP server returned: {status} - {response}")
return
print("Mail copied successfully, marking source mail for deletion...")
status, response = mail.uid('STORE', uid_string, '+FLAGS.SILENT', '\\Deleted')
if status != 'OK':
print(f"Failed to mark for deletion! Please check source folder manually.")
return
def main():
print("Starting Mailing List System...")
mail = imaplib.IMAP4(IMAP_SERVER, IMAP_PORT)
mail.login(LIST_EMAIL, LIST_PASSWORD)
ensure_archive_folder_exists(mail, 'archive')
mail.logout()
while True:
try:
mail = imaplib.IMAP4(IMAP_SERVER, IMAP_PORT)
mail.login(LIST_EMAIL, LIST_PASSWORD)
mail.select('inbox')
status, messages = mail.search(None, 'UNSEEN')
expunge_flag = False
if status == 'OK':
email_ids = messages[0].split()
if email_ids:
print(f"Found {len(email_ids)} new emails.")
else:
print("No email found.")
for num in email_ids:
res, msg_data = mail.fetch(num, '(RFC822)')
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
action = process_email(msg)
print("msg processed, post-process action: " + action)
if action == 'archive':
archive_mail(mail, num)
expunge_flag = True
elif action == 'delete':
mail.uid('STORE', num, '+FLAGS', '\\Deleted')
expunge_flag = True
if expunge_flag:
status, repsonse = mail.expunge()
if status == 'OK':
print('EXPUNGE success')
else:
print('EXPUNGE failed')
mail.close()
mail.logout()
except Exception as e:
print(f"[Error] Main loop error: {e}")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()
{
"name": "Simple Mailing List",
"address": "[email protected]",
"password": "YOUR_PASSWORD",
"imap-server": "imap.example.org",
"imap-port": 143,
"smtp-server": "smtp.example.org",
"smtp-port": 25,
"subscribers": "subscribers.txt"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment