Last active
February 22, 2020 19:43
-
-
Save danielrichman/aa27c1e948e0c1212d4c to your computer and use it in GitHub Desktop.
templated mail merging with postgres & jinja2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
query: SELECT * FROM people WHERE condition | |
sender: | |
name: Daniel Richman | |
email: [email protected] | |
to_email_keys: [homeemail, workemail] | |
to_name_template: | | |
{{ firstname }} {{ surname }} | |
subject_template: Hello {{ firstname }} | |
body_template: | | |
Hello, {% include "to_name" %} | |
I am an autogenerated email. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
from __future__ import unicode_literals, print_function, division | |
import sys | |
import os | |
import email.mime.text | |
import email.utils | |
import email.generator | |
import smtplib | |
import argparse | |
import textwrap | |
import yaml | |
import jinja2 | |
import jinja2.ext | |
import psycopg2 | |
import psycopg2.extras | |
import psycopg2.extensions | |
class RewrapExtension(jinja2.ext.Extension): | |
tags = set(['rewrap']) | |
def parse(self, parser): | |
# first token is 'rewrap' | |
lineno = parser.stream.next().lineno | |
if parser.stream.current.type != 'block_end': | |
width = parser.parse_expression() | |
else: | |
width = jinja2.nodes.Const(78) | |
body = parser.parse_statements(['name:endrewrap'], drop_needle=True) | |
call = self.call_method('_rewrap', [width]) | |
return jinja2.nodes.CallBlock(call, [], [], body).set_lineno(lineno) | |
def _rewrap(self, width, caller): | |
contents = caller() | |
lines = [line.strip() for line in contents.splitlines()] | |
lines.append('') | |
paragraphs = [] | |
start = 0 | |
while start != len(lines): | |
end = lines.index('', start) | |
if start != end: | |
paragraph = ' '.join(lines[start:end]) | |
paragraphs.append(paragraph) | |
start = end + 1 | |
new_lines = [] | |
for paragraph in paragraphs: | |
if new_lines: | |
new_lines.append('') | |
new_lines += textwrap.wrap(paragraph, width) | |
# under the assumption that there will be a newline immediately after | |
# the endrewrap block, don't put a newline on the end. | |
return '\n'.join(new_lines) | |
class EmailTemplate(object): | |
def __init__(self, filename): | |
with open(filename) as f: | |
d = yaml.safe_load(f) | |
if not isinstance(d, dict): | |
raise TypeError("{0} did not contain a dictionary".format(filename)) | |
self.query = d["query"] | |
self.to_email_keys = d["to_email_keys"] | |
sender = d["sender"] | |
self.sender = (sender["name"], sender["email"]) | |
loader = jinja2.DictLoader({ | |
"to_name": d["to_name_template"].strip(), | |
"body": d["body_template"].strip(), | |
"subject": d["subject_template"].strip() | |
}) | |
jinja_env = jinja2.Environment( | |
undefined=jinja2.StrictUndefined, | |
loader=loader, | |
extensions=['jinja2.ext.with_', RewrapExtension] | |
) | |
self.to_name_template = jinja_env.get_template("to_name") | |
self.body_template = jinja_env.get_template("body") | |
self.subject_template = jinja_env.get_template("subject") | |
def run_query(self): | |
conn = psycopg2.connect(database="dbname") | |
try: | |
conn.set_client_encoding('UTF8') | |
for type in (psycopg2.extensions.UNICODE, psycopg2.extensions.UNICODEARRAY): | |
psycopg2.extensions.register_type(type, conn) | |
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | |
cur.execute(self.query) | |
for row in cur: | |
yield row | |
finally: | |
conn.close() | |
def row_recipient_info(self, row): | |
to_name = self.to_name_template.render(**row) | |
to_emails = [row[k] for k in self.to_email_keys if row[k]] | |
if not to_emails: | |
raise Exception("All emails were empty?", row) | |
return (to_name, to_emails) | |
def row_render(self, row): | |
row = row.copy() | |
(to_name, to_emails) = self.row_recipient_info(row) | |
row["to_name"] = to_name | |
subject = self.subject_template.render(**row) | |
row["subject"] = subject | |
body = self.body_template.render(**row) | |
message = email.mime.text.MIMEText(unicode(body), _charset='utf-8') | |
message["From"] = email.utils.formataddr(self.sender) | |
message["To"] = ", ".join(email.utils.formataddr((to_name, e)) for e in to_emails) | |
message["Subject"] = subject | |
return (to_emails, message) | |
def row_send(self, row, override_recipients=None): | |
to_emails, message = self.row_render(row) | |
s = smtplib.SMTP('localhost') | |
if override_recipients: | |
to_emails = override_recipients | |
s.sendmail(self.sender[1], to_emails, message.as_string()) | |
s.quit() | |
def argparser(): | |
argparser = argparse.ArgumentParser(prog='mailmerge') | |
argparser.add_argument('template_name', help="template filename", metavar="mymail.yaml") | |
subparsers = argparser.add_subparsers(help='action') | |
send_all = subparsers.add_parser('send-all') | |
send_all.add_argument("--confirm", action="store_true") | |
send_all.set_defaults(action="send_all") | |
test_recipients = subparsers.add_parser('test-recipients') | |
test_recipients.set_defaults(action="test_recipients") | |
test_to_me = subparsers.add_parser('test-to-me') | |
test_to_me.add_argument("search_key", metavar="mid") | |
test_to_me.add_argument("search_value", metavar="123") | |
test_to_me.set_defaults(action="test_to_me") | |
test_display = subparsers.add_parser('test-display') | |
test_display.add_argument("search_key", metavar="mid") | |
test_display.add_argument("search_value", metavar="123") | |
test_display.set_defaults(action="test_display") | |
return argparser | |
def main(args): | |
template = EmailTemplate(args.template_name) | |
if args.action == "test_recipients": | |
for row in template.run_query(): | |
to_name, to_emails = template.row_recipient_info(row) | |
print(to_name.ljust(30), *to_emails) | |
elif args.action == "test_display": | |
generator = email.generator.DecodedGenerator(sys.stdout, False) | |
for row in template.run_query(): | |
if str(row[args.search_key]) == args.search_value: | |
to_emails, message = template.row_render(row) | |
print("RCPT TO:", *to_emails) | |
generator.flatten(message) | |
elif args.action == "test_to_me": | |
recipient = os.getlogin() + '@localhost' | |
count = 0 | |
for row in template.run_query(): | |
if str(row[args.search_key]) == args.search_value: | |
template.row_send(row, override_recipients=[recipient]) | |
count += 1 | |
print("Sent", count, "emails") | |
elif args.action == "send_all" and not args.confirm: | |
print("Are you really really sure? Provide --confirm to continue.") | |
elif args.action == "send_all" and args.confirm: | |
count = 0 | |
try: | |
for row in template.run_query(): | |
template.row_send(row) | |
count += 1 | |
finally: | |
print("Sent", count, "emails") | |
else: | |
assert False | |
if __name__ == "__main__": | |
main(argparser().parse_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment