Last active
September 1, 2015 06:45
-
-
Save fzliu/86a471e46f8006a14bab to your computer and use it in GitHub Desktop.
A customizable, "black hole" mail server.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import asyncore | |
from email import message_from_string | |
from email.iterators import typed_subpart_iterator | |
from email.mime.text import MIMEText | |
from email.utils import formataddr | |
import getpass | |
import logging | |
import os | |
import pprint | |
from smtpd import SMTPServer | |
import smtplib | |
import sys | |
# argparse | |
parser = argparse.ArgumentParser(description="Customizable SMTP server.", | |
usage="courier.py -c <command>") | |
parser.add_argument("-c", "--command", required=False, default="print", help="mail command (bounce, dump, print)") | |
parser.add_argument("-o", "--out", required=False, default=None, help="output path for dump") | |
# logging | |
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) | |
VALID_COMMANDS = ["bounce", "dump", "print"] | |
GMAIL_AUTH_FNAME = ".gmail_auth" | |
GMAIL_SMTP_SERVER = "smtp.gmail.com" | |
GMAIL_SMTP_PORT = 587 | |
COURIER_EMAIL_ADDR = "[email protected]" | |
def _bounce_mail(mail, server): | |
""" | |
Bounce mail back to the sender (with a friendly message). | |
""" | |
# check the destination addresses | |
if COURIER_EMAIL_ADDR in mail["addrs-to"]: | |
# compose the "bounce" message | |
append_txt = "Hey there! Here's your original message: \n" + "-"*20 | |
bounce_msg = MIMEText(append_txt + "\n\n" + mail["body"]) | |
bounce_msg["Subject"] = mail["subject"] | |
bounce_msg["From"] = COURIER_EMAIL_ADDR | |
bounce_msg["To"] = mail["addr-from"] | |
# send it | |
server.sendmail(COURIER_EMAIL_ADDR, mail["addr-from"], bounce_msg.as_string()) | |
logging.info("bounced email back to {0}".format(mail["addr-from"])) | |
def _dump_mail(mail, path): | |
""" | |
Courier handler that dumps all mail to a text file. | |
""" | |
with open(path, "a") as f: | |
pprint.pprint(mail, stream=f) | |
def _print_mail(mail, stream): | |
""" | |
Courier handler that prints all received mail. | |
""" | |
pprint.pprint(mail, stream=stream) | |
class Courier(SMTPServer, object): | |
""" | |
Courier class, built on top of Python's SMTPServer. | |
""" | |
def __init__(self, cb_fn, cb_data, *args, **kwargs): | |
super(Courier, self).__init__(*args, **kwargs) | |
self._cb_fn = cb_fn | |
self._cb_data = cb_data | |
def process_message(self, peer, mailfrom, rcpttos, data): | |
""" | |
Process a single inbound message. | |
""" | |
headers = message_from_string(data) | |
body = "" | |
# multipart | |
if headers.is_multipart(): | |
for part in typed_subpart_iterator(headers, "text", "plain"): | |
body += part.get_payload(decode=True).strip() | |
# message body | |
else: | |
body += headers.get_payload(decode=True).strip() | |
# compile the data | |
mail_data = { | |
"addr-from": mailfrom, # address only | |
"addrs-to": rcpttos, # addresses only | |
"sender": headers["from"], # includes name and address | |
"recipients": headers["to"], # includes names and addresses | |
"subject": headers["subject"], # subject | |
"body": body # body | |
} | |
logging.debug(mail_data) | |
return self._cb_fn(mail_data, self._cb_data) | |
def start_courier(args): | |
""" | |
Starts up a Courier instance. | |
""" | |
# get the command | |
command = args.command if args.command else "print" | |
assert command in VALID_COMMANDS, "please specify a valid commmand" | |
logging.info("specified command {0}".format(command)) | |
# outgoing email server | |
if command == "bounce" and os.path.isfile(GMAIL_AUTH_FNAME): | |
out_srv = smtplib.SMTP(GMAIL_SMTP_SERVER, GMAIL_SMTP_PORT, timeout=sys.maxint) | |
out_srv.ehlo() | |
out_srv.starttls() | |
with open(GMAIL_AUTH_FNAME) as f: | |
user = f.readline().strip() | |
passwd = getpass.getpass("Password for {0}:".format(user)) | |
is_pass_accepted = False | |
while not is_pass_accepted: | |
try: | |
out_srv.login(user, passwd) | |
is_pass_accepted = True | |
except smtplib.SMTPAuthenticationError: | |
passwd = getpass.getpass("Try again:") | |
else: | |
out_srv = None | |
# command and data (associated with command) map | |
cmd_map = { | |
"bounce": _bounce_mail, | |
"dump": _dump_mail, | |
"print": _print_mail | |
} | |
data_map = { | |
"bounce": out_srv, | |
"dump": args.out, | |
"print": sys.stderr | |
} | |
# start up | |
courier = Courier(cmd_map[command], data_map[command], | |
("0.0.0.0", 25), None) | |
try: | |
logging.info("starting Courier") | |
asyncore.loop() | |
except KeyboardInterrupt: | |
logging.info("stopping Courier") | |
if out_srv is not None: | |
out_srv.quit() | |
if __name__ == "__main__": | |
args = parser.parse_args() | |
start_courier(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment