Skip to content

Instantly share code, notes, and snippets.

@href
Created February 24, 2014 10:32
Show Gist options
  • Select an option

  • Save href/9185284 to your computer and use it in GitHub Desktop.

Select an option

Save href/9185284 to your computer and use it in GitHub Desktop.
Tests for Seantis Postfix
import time
import unittest
import poplib
import textwrap
from uuid import uuid4
from email import Encoders
from email.base64mime import encode
from email.MIMEBase import MIMEBase
from email.mime.text import MIMEText
from email.MIMEMultipart import MIMEMultipart
from smtplib import SMTP, SMTPRecipientsRefused, SMTPException
from imbox import Imbox
from sievelib import managesieve
from imaplib2 import IMAP4
# the timeout used for clamav and spamassassin processing
processing_timeout = 1.0
class TestPostfixServer(unittest.TestCase):
server = 'mail.seantis.dev'
client = 'example.org'
def encode_auth(self, username, password):
return encode('\0{}\0{}'.format(username, password), eol="")
def smtp(self, server=None, port=25):
smtp = SMTP()
smtp.connect(server or self.server, port)
return smtp
def send_mail(
self, sender, recipient, subject, body,
user=None, pw=None, attachments=[]
):
# this is how a client should do it usually, so we do it as well
smtp = self.smtp(port=587)
smtp.starttls()
smtp.login(user or '[email protected]', pw or 'test')
if attachments:
msg = MIMEMultipart(body)
else:
msg = MIMEText(body)
msg['From'] = sender
msg['To'] = recipient
msg['Subject'] = subject
for ix, attachment in enumerate(attachments):
part = MIMEBase('application', "octet-stream")
part.set_payload(attachment)
Encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
'attachment; filename="file_{}"'.format(ix)
)
msg.attach(part)
return smtp.sendmail(sender, recipient, msg.as_string())
def wait_until_processed(self):
time.sleep(processing_timeout)
def test_connect(self):
smtp = SMTP()
code, msg = smtp.connect(self.server, 25)
self.assertEqual(code, 220)
self.assertEqual(msg, 'mail.seantis.ch ESMTP Postfix')
def test_login_ehlo(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('[email protected]', 'test')
def test_login_helo(self):
smtp = self.smtp()
smtp.starttls()
smtp.helo(self.client)
# login cannot be called here because there's a bug in python:
# http://bugs.python.org/issue6683
smtp.docmd('AUTH PLAIN', self.encode_auth('[email protected]', 'test'))
def test_helo_required(self):
smtp = self.smtp()
code, msg = smtp.docmd("MAIL FROM: <[email protected]>")
self.assertIn("send HELO/EHLO first", msg)
self.assertEqual(code, 503)
def test_not_an_open_relay(self):
smtp = self.smtp()
smtp.ehlo('google.ch')
smtp.docmd('mail from: [email protected]')
code, msg = smtp.docmd('rcpt to: [email protected]')
self.assertIn('Relay access denied', msg)
self.assertEqual(code, 554)
def test_not_send_to_self_unauthenticated(self):
smtp = self.smtp()
smtp.ehlo('example.org')
code, msg = smtp.docmd('mail from: [email protected]')
self.assertIn('Sender address rejected: not logged in', msg)
self.assertEqual(code, 553)
def test_authenticated_only_recipients(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo('example.org')
smtp.docmd('mail from: [email protected]')
code, msg = smtp.docmd('rcpt to: [email protected]')
self.assertIn('Recipient address rejected: Access denied', msg)
self.assertEqual(code, 554)
def test_authenticated_only_recipients_logged_in(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('[email protected]', 'test')
smtp.docmd('mail from: [email protected]')
code, msg = smtp.docmd('rcpt to: [email protected]')
self.assertIn('Ok', msg)
self.assertEqual(code, 250)
def test_invalid_client_hostname(self):
smtp = self.smtp()
smtp.ehlo('google.ch')
smtp.docmd('mail from: [email protected]')
code, msg = smtp.docmd('rcpt to: [email protected]')
self.assertIn('Client host rejected', msg)
self.assertEqual(code, 450)
def test_invalid_recipient(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('[email protected]', 'test')
smtp.docmd('mail from: [email protected]')
code, msg = smtp.docmd('rcpt to: [email protected]')
self.assertIn('Recipient address rejected: User unknown', msg)
self.assertEqual(code, 550)
def test_non_fqdn_recipient(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('[email protected]', 'test')
smtp.docmd('mail from: [email protected]')
code, msg = smtp.docmd('rcpt to: user')
self.assertIn('need fully-qualified address', msg)
self.assertEqual(code, 504)
def test_unknown_recipient_domain(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('[email protected]', 'test')
smtp.docmd('mail from: [email protected]')
code, msg = smtp.docmd('rcpt to: [email protected]')
self.assertIn('Domain not found', msg)
self.assertEqual(code, 450)
def test_smtp_without_tls(self):
smtp = self.smtp()
smtp.ehlo(self.client)
self.assertRaises(
SMTPException, smtp.login, '[email protected]', 'test'
)
def test_valid_send_ssl(self):
smtp = self.smtp()
smtp.ehlo(self.client)
smtp.starttls()
smtp.login('[email protected]', 'test')
smtp.docmd('mail from: [email protected]')
code, msg = smtp.docmd('rcpt to: [email protected]')
self.assertIn('Ok', msg)
self.assertEqual(code, 250)
def test_disable_vrfy_command(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('[email protected]', 'test')
code, msg = smtp.verify('[email protected]')
self.assertIn('VRFY command is disabled', msg)
self.assertEqual(code, 502)
def test_invalid_helos(self):
smtp = self.smtp()
code, msg = smtp.ehlo('mail.seantis.ch')
self.assertIn("Don't use my hostname", msg)
self.assertEqual(code, 554)
code, msg = smtp.ehlo('iris.seantis.ch')
self.assertIn("Don't use my hostname", msg)
self.assertEqual(code, 554)
code, msg = smtp.ehlo('[192.168.101.119]')
self.assertIn("Don't use my IP address", msg)
self.assertEqual(code, 554)
code, msg = smtp.ehlo('192.168.101.119')
self.assertIn('Your software is not RFC 2821 compliant', msg)
self.assertEqual(code, 554)
def test_sender_spoofing(self):
# users may not send emails with the address of another user
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('[email protected]', 'test')
code, msg = smtp.docmd('mail from: [email protected]')
self.assertIn('Sender address rejected: not owned by user', msg)
self.assertEqual(code, 553)
# users which receive aliased mails may send emails through them
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('[email protected]', 'test')
code, msg = smtp.docmd('mail from: [email protected]')
self.assertIn('Ok', msg)
self.assertEqual(code, 250)
# [email protected] is allowed to send for the other users
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('[email protected]', 'test')
code, msg = smtp.docmd('mail from: [email protected]')
self.assertIn('Ok', msg)
self.assertEqual(code, 250)
# [email protected] should still be able to send for himself
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('[email protected]', 'test')
code, msg = smtp.docmd('mail from: [email protected]')
self.assertIn('Ok', msg)
self.assertEqual(code, 250)
def test_any_sender_does_not_receive(self):
# the [email protected] may send as any other adress, but he will
# never receive emails in these tests
imbox = Imbox(
self.server,
username='[email protected]',
password='test',
ssl=True
)
self.assertEqual(len(list(imbox.messages())), 0)
def test_tls(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
code, msg = smtp.login('[email protected]', 'test')
self.assertIn('Authentication successful', msg)
self.assertEqual(code, 235)
def test_tls_forced_on_submission(self):
# using submission on port 587 requires TLS
smtp = self.smtp(port=587)
smtp.ehlo(self.client)
code, msg = smtp.docmd(
'AUTH PLAIN', self.encode_auth('[email protected]', 'test')
)
self.assertIn('Must issue a STARTTLS command first', msg)
self.assertEqual(code, 530)
smtp.starttls()
smtp.ehlo(self.client)
code, msg = smtp.docmd(
'AUTH PLAIN', self.encode_auth('[email protected]', 'test')
)
self.assertIn('Authentication successful', msg)
self.assertEqual(code, 235)
def test_imap_auth(self):
imbox = Imbox(
self.server,
username='[email protected]',
password='test',
ssl=True
)
self.assertTrue(imbox.connection)
def test_imap_auth_no_plain(self):
try:
Imbox(
self.server,
username='[email protected]',
password='test',
ssl=False
)
except Exception, e:
self.assertIn('Plaintext authentication disallowed', e.message)
else:
assert False, "Should have failed"
def test_imap_auth_starttls(self):
connection = IMAP4(self.server, port=143)
msg, result = connection.starttls()
self.assertEqual(msg, 'OK')
msg, result = connection.login('[email protected]', 'test')
self.assertEqual(msg, 'OK')
self.assertEqual(result, ['Logged in'])
msg, result = connection.list()
self.assertEqual(msg, 'OK')
def test_imap_receive_email(self):
# use a random text to find the right message again
text = uuid4().hex
self.send_mail(
sender='[email protected]',
recipient='[email protected]',
subject=text,
body=text
)
self.wait_until_processed()
# both user and root should get the aliased email
for user in ('[email protected]', '[email protected]'):
imbox = Imbox(
self.server,
username=user,
password='test',
ssl=True
)
messages = [m for m in imbox.messages() if m[1].subject == text]
self.assertEqual(len(messages), 1)
def test_pop3_receive_email(self):
# use a random text to find the right message again
text = uuid4().hex
self.send_mail(
sender='[email protected]',
recipient='[email protected]',
subject=text,
body=text
)
self.wait_until_processed()
pop = poplib.POP3_SSL(self.server)
try:
pop.user('[email protected]')
pop.pass_('test')
found_emails = 0
for i in range(len(pop.list()[1])):
for j in pop.retr(i+1)[1]:
if text in j and 'Subject' in j:
found_emails += 1
self.assertEqual(found_emails, 1)
finally:
pop.quit() # unlock mailbox
def test_pop3_auth_no_plain(self):
# we can only really check if no plain text auth works with pop3,
# not if starttls works, because python's library does not support that
# but we do care more about plaintext not working anyway
pop = poplib.POP3(self.server)
try:
pop.user('[email protected]')
except Exception, e:
self.assertIn('Plaintext authentication disallowed', e.message)
else:
assert False, "Exception should have occured"
def test_outbound_policy(self):
# policyd's outbound policy restricts the email by counting upwards
# with each new mail, whilst counting downwards over time which is
# why the code below is able to send 4 messages even though the limit
# is 3 messages in 3 seconds.
def send():
return self.send_mail(
sender='[email protected]',
user='[email protected]',
recipient='[email protected]',
subject='test',
body='test'
)
# the limit will kick in anywhere between 3 and 6 messages depending
# on how fast the test can be run / how many times they are re-run
# in a short period of time
try:
for i in range(0, 6):
send()
except SMTPRecipientsRefused, e:
code, msg = e.recipients['[email protected]']
else:
code, msg = None, None
self.assertNotEqual(code, None)
self.assertNotEqual(msg, None)
self.assertIn('Policy rejection; Message count quota exceeded', msg)
self.assertEqual(code, 450)
def test_send_virus_signature(self):
# send the EICAR virus test signature to ensure that the
# email is found to have a virus and is in effect dropped silently
# see http://www.eicar.org/86-0-Intended-use.html
signature = (
'X5O!P%@AP[4\PZX54(P^)7CC)7}$'
'EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*'
)
text = uuid4().hex
self.send_mail(
sender='[email protected]',
recipient='[email protected]',
subject=text,
body=text,
attachments=[signature]
)
self.wait_until_processed()
# the mail should have been dropped
imbox = Imbox(
self.server,
username='[email protected]',
password='test',
ssl=True
)
messages = [m for m in imbox.messages() if m[1].subject == text]
self.assertEqual(len(messages), 0)
def test_default_imap_folders(self):
# see 10-mail.conf.erb
default_folders = ['Drafts', 'Sent', 'Spam', 'Trash', 'Archive']
imbox = Imbox(
self.server,
username='[email protected]',
password='test',
ssl=True
)
server = imbox.server.list_folders()[1]
for folder in default_folders:
self.assertEqual(len([f for f in server if folder in f]), 1)
def test_spam_blacklist(self):
text = uuid4().hex
self.send_mail(
sender='[email protected]',
recipient='[email protected]',
subject=text,
body='',
user='[email protected]',
pw='test'
)
self.wait_until_processed()
imbox = Imbox(
self.server,
username='[email protected]',
password='test',
ssl=True
)
# it should not be in the normal folder
messages = [m for m in imbox.messages() if m[1].subject == text]
self.assertEqual(len(messages), 0)
# it will be in the spam folder
messages = [
m for m in imbox.messages(folder='Spam') if m[1].subject == text
]
self.assertEqual(len(messages), 1)
headers = messages[0][1].headers
headers = dict((h['Name'], h['Value']) for h in headers)
self.assertTrue(headers['X-Spam-Status'].startswith('Yes'))
def test_receive_spam(self):
# send the GTUBE spam test signature and ensure that the email
# headers reflect the fact that it's spam
signature = (
'XJS*C4JDBQADN1.NSBN3*2IDNEN*'
'GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X'
)
text = uuid4().hex
self.send_mail(
sender='[email protected]',
recipient='[email protected]',
subject=text,
body=signature
)
self.wait_until_processed()
imbox = Imbox(
self.server,
username='[email protected]',
password='test',
ssl=True
)
# it should not be in the normal folder
messages = [m for m in imbox.messages() if m[1].subject == text]
self.assertEqual(len(messages), 0)
# it will be in the spam folder
messages = [
m for m in imbox.messages(folder='Spam') if m[1].subject == text
]
self.assertEqual(len(messages), 1)
headers = messages[0][1].headers
headers = dict((h['Name'], h['Value']) for h in headers)
self.assertTrue(headers['X-Spam-Status'].startswith('Yes'))
def test_manage_sieve(self):
client = managesieve.Client(self.server)
# tls only
result = client.connect('[email protected]', 'test', starttls=False)
self.assertFalse(result)
result = client.connect('[email protected]', 'test', starttls=True)
self.assertTrue(result)
token = uuid4().hex
script = textwrap.dedent("""
require "body";
require "reject";
if body :contains "%s" {
reject "please do not send me anymore e-mails";
}
""" % token)
try:
script_put = client.putscript('test', script)
client.setactive('test')
self.assertTrue(script_put, "sieve script syntax error")
self.send_mail(
sender='[email protected]',
user='[email protected]',
recipient='[email protected]',
subject=token,
body=token
)
self.wait_until_processed()
# rejecting the email means that it's sent back to the user
# (it's not actually rejected during the sending, because
# that would obviously be way too slow)
imbox = Imbox(
self.server,
username='[email protected]',
password='test',
ssl=True
)
messages = [m for m in imbox.messages() if token in m[1].subject]
self.assertEqual(len(messages), 1)
self.assertEqual('Rejected: {}'.format(token), m[1].subject)
self.assertEqual(
m[1].sent_from[0]['email'], '[email protected]'
)
# imbox doesn't handle multipart/report messages well so
# we need to peek into the raw message
raw = imbox.connection.uid('fetch', m[0], '(BODY.PEEK[])')[1][0][1]
self.assertIn('please do not send me anymore e-mails', raw)
finally:
client.deletescript('test')
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment