Last active
October 27, 2023 15:22
-
-
Save ryancdotorg/a8f565b9e4f0902eb7b5cd4cdefeea0f to your computer and use it in GitHub Desktop.
Experimental DKIM rotate/revoke/repudiate script for Exim+Route53. I take no responsibility for its use.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import grp | |
import sys | |
import stat | |
import time | |
import hmac | |
import boto3 | |
import tempfile | |
from datetime import datetime, timedelta | |
from hashlib import md5, sha1, sha256 | |
from Crypto.PublicKey import RSA | |
from base64 import b64encode as b64e, b64decode as b64d | |
from binascii import hexlify, unhexlify | |
DOMAIN = sys.argv[1].rstrip('.') | |
SECRET = 'PUY_YOUR_RANDOM_SECRET_HERE' | |
BASEDIR = '/etc/exim4/dkim_keys' | |
# In the exim "remote_smpt" transport set the following (assuming Debian): | |
# DKIM_DOMAIN = ${domain:$return_path} | |
# DKIM_HMAC = PUT_YOUR_RANDOM_SECRET_HERE | |
# DKIM_DATE = ${substr{0}{8}{$tod_logfile}} | |
# DKIM_SELECTOR = DKIM_DATE-${substr{0}{23}{${hmac{sha1}{DKIM_HMAC}{DKIM_DATE}}}} | |
# DKIM_FILE = /etc/exim4/dkim_keys/${lc:DKIM_SELECTOR}_${lc:DKIM_DOMAIN}.key | |
# DKIM_PRIVATE_KEY = ${if exists{DKIM_FILE}{DKIM_FILE}{0}} | |
cli = boto3.Session(profile_name='dkim').client('route53') | |
zone_name = '_domainkey.'+DOMAIN+'.' | |
zone_list = cli.list_hosted_zones_by_name(DNSName=zone_name, MaxItems='1') | |
zone = zone_list['HostedZones'][0] | |
if zone['Name'] != zone_name: | |
raise Exception('zone %s not found!' % zone_name) | |
zone_id = zone['Id'] | |
waiter = cli.get_waiter('resource_record_sets_changed') | |
waiter.config.delay = 15 | |
waiter.config.max_attempts = 20 | |
# This script is designed to work with exim, which as of the time of writing has | |
# built-in support for hmac, but only using md5 or sha1. | |
def hmac_sha1_hex(k, m): | |
return hmac.new(k, m, sha1).hexdigest() | |
def dkim_pub(rsa): | |
return 'v=DKIM1;t=s;p=' + b64e(rsa.publickey().exportKey('DER')) | |
# per RFC6376 empty p= means "revoked", and n= is a notes field | |
def dkim_priv(rsa): | |
return 'v=DKIM1;t=s;p=;n=e:%s,p:%s,q:%s' % ( | |
long_to_b64(rsa.e), | |
long_to_b64(rsa.p), | |
long_to_b64(rsa.q), | |
) | |
# generate selectors that can't be guessed in advance without the key | |
def date_to_selector(date): | |
strdate = date.strftime('%Y%m%d') | |
return strdate + '-' + hmac_sha1_hex(SECRET, strdate)[0:23] | |
def long_to_b64(l): | |
a = bytearray() | |
while l: | |
a.append(l & 255) | |
l >>= 8 | |
a.reverse() | |
return b64e(a) | |
def format_txt(txt): | |
# https://aws.amazon.com/premiumsupport/knowledge-center/txtrdatatoolong-error/ | |
return ''.join([ '"%s"' % txt[i:i+255] for i in xrange(0, len(txt), 255) ]) | |
def push_record(selector, txt): | |
res = cli.change_resource_record_sets( | |
HostedZoneId = zone_id, | |
ChangeBatch = { | |
'Changes': [{ | |
'Action': 'UPSERT', | |
'ResourceRecordSet': { | |
'Name': selector+'.'+zone_name, | |
'Type': 'TXT', | |
'TTL': 5, | |
'ResourceRecords': [ | |
{ 'Value': '"%s"' % txt } | |
] | |
} | |
}] | |
} | |
) | |
print 'waiting on record propagation %s' % res['ChangeInfo']['Id'] | |
return waiter.wait(Id=res['ChangeInfo']['Id']) | |
# as an intermediate step before publishing private parameters, the key is simply revoked | |
def revoke_key(selector): | |
dkim = 'v=DKIM1;t=s;p=' | |
res = cli.list_resource_record_sets( | |
HostedZoneId = zone_id, | |
StartRecordName = selector+'.'+zone_name, | |
StartRecordType = 'TXT', | |
MaxItems = '1' | |
) | |
val = res['ResourceRecordSets'][0]['ResourceRecords'][0]['Value'] | |
if val != '"%s"' % dkim: | |
print selector | |
print 'push dkim revocation' | |
push_record(selector, dkim) | |
# publish private parameters, allowing signatures on old mail to be forged | |
def repudiate_key(selector): | |
#print selector | |
filename = '%s_%s.key' % (selector, DOMAIN) | |
path = BASEDIR + '/' + filename | |
if not os.path.isfile(path): | |
#print 'does not exist' | |
return | |
print selector | |
print 'load private key' | |
rsa = None | |
with open(path, 'r') as fh: | |
rsa = RSA.importKey(fh.read()) | |
print 'generate dkim repudiation' | |
dkim = dkim_priv(rsa) | |
print 'push dkim repudiation' | |
push_record(selector, dkim) | |
print 'delete repudiated key file' | |
os.remove(path) | |
def create_key(selector): | |
filename = '%s_%s.key' % (selector, DOMAIN) | |
path = BASEDIR + '/' + filename | |
if os.path.isfile(path): | |
#print 'already exists' | |
return | |
print selector | |
print 'generate rsa key' | |
rsa = RSA.generate(1024) | |
print 'generate dkim record' | |
dkim = dkim_pub(rsa) | |
print 'push dkim record' | |
push_record(selector, dkim) | |
print 'writing private key to temp file' | |
tmp_fd, tmp_name = tempfile.mkstemp('', '.'+filename+'.', BASEDIR, True) | |
os.write(tmp_fd, rsa.exportKey()) | |
os.fchown(tmp_fd, 0, grp.getgrnam('Debian-exim')[2]) | |
os.fchmod(tmp_fd, 0o0440) | |
os.fsync(tmp_fd) | |
os.close(tmp_fd) | |
print 'renaming temp file' | |
os.rename(tmp_name, path) | |
d_start = datetime.utcnow() | |
# expire keys | |
d = d_start | |
while True: | |
d = d - timedelta(days=1) | |
filename = '%s_%s.key' % (date_to_selector(d), DOMAIN) | |
path = BASEDIR + '/' + filename | |
if not os.path.isfile(path): | |
break | |
# publish private parameters after 10 days | |
while d < d_start - timedelta(days=10): | |
repudiate_key(date_to_selector(d)) | |
d = d + timedelta(days=1) | |
# public key revokation after 7 days | |
while d < d_start - timedelta(days=7): | |
revoke_key(date_to_selector(d)) | |
d = d + timedelta(days=1) | |
# create keys | |
d = d_start | |
for _ in xrange(28): | |
create_key(date_to_selector(d)) | |
d = d + timedelta(days=1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment