Created
February 24, 2014 10:32
-
-
Save href/9185284 to your computer and use it in GitHub Desktop.
Tests for Seantis Postfix
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import unittest | |
| import poplib | |
| import textwrap | |
| from uuid import uuid4 | |
| from email import Encoders | |
| from email.base64mime import encode | |
| from email.MIMEBase import MIMEBase | |
| from email.mime.text import MIMEText | |
| from email.MIMEMultipart import MIMEMultipart | |
| from smtplib import SMTP, SMTPRecipientsRefused, SMTPException | |
| from imbox import Imbox | |
| from sievelib import managesieve | |
| from imaplib2 import IMAP4 | |
| # the timeout used for clamav and spamassassin processing | |
| processing_timeout = 1.0 | |
| class TestPostfixServer(unittest.TestCase): | |
| server = 'mail.seantis.dev' | |
| client = 'example.org' | |
| def encode_auth(self, username, password): | |
| return encode('\0{}\0{}'.format(username, password), eol="") | |
| def smtp(self, server=None, port=25): | |
| smtp = SMTP() | |
| smtp.connect(server or self.server, port) | |
| return smtp | |
| def send_mail( | |
| self, sender, recipient, subject, body, | |
| user=None, pw=None, attachments=[] | |
| ): | |
| # this is how a client should do it usually, so we do it as well | |
| smtp = self.smtp(port=587) | |
| smtp.starttls() | |
| smtp.login(user or '[email protected]', pw or 'test') | |
| if attachments: | |
| msg = MIMEMultipart(body) | |
| else: | |
| msg = MIMEText(body) | |
| msg['From'] = sender | |
| msg['To'] = recipient | |
| msg['Subject'] = subject | |
| for ix, attachment in enumerate(attachments): | |
| part = MIMEBase('application', "octet-stream") | |
| part.set_payload(attachment) | |
| Encoders.encode_base64(part) | |
| part.add_header( | |
| 'Content-Disposition', | |
| 'attachment; filename="file_{}"'.format(ix) | |
| ) | |
| msg.attach(part) | |
| return smtp.sendmail(sender, recipient, msg.as_string()) | |
| def wait_until_processed(self): | |
| time.sleep(processing_timeout) | |
| def test_connect(self): | |
| smtp = SMTP() | |
| code, msg = smtp.connect(self.server, 25) | |
| self.assertEqual(code, 220) | |
| self.assertEqual(msg, 'mail.seantis.ch ESMTP Postfix') | |
| def test_login_ehlo(self): | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| smtp.login('[email protected]', 'test') | |
| def test_login_helo(self): | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.helo(self.client) | |
| # login cannot be called here because there's a bug in python: | |
| # http://bugs.python.org/issue6683 | |
| smtp.docmd('AUTH PLAIN', self.encode_auth('[email protected]', 'test')) | |
| def test_helo_required(self): | |
| smtp = self.smtp() | |
| code, msg = smtp.docmd("MAIL FROM: <[email protected]>") | |
| self.assertIn("send HELO/EHLO first", msg) | |
| self.assertEqual(code, 503) | |
| def test_not_an_open_relay(self): | |
| smtp = self.smtp() | |
| smtp.ehlo('google.ch') | |
| smtp.docmd('mail from: [email protected]') | |
| code, msg = smtp.docmd('rcpt to: [email protected]') | |
| self.assertIn('Relay access denied', msg) | |
| self.assertEqual(code, 554) | |
| def test_not_send_to_self_unauthenticated(self): | |
| smtp = self.smtp() | |
| smtp.ehlo('example.org') | |
| code, msg = smtp.docmd('mail from: [email protected]') | |
| self.assertIn('Sender address rejected: not logged in', msg) | |
| self.assertEqual(code, 553) | |
| def test_authenticated_only_recipients(self): | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo('example.org') | |
| smtp.docmd('mail from: [email protected]') | |
| code, msg = smtp.docmd('rcpt to: [email protected]') | |
| self.assertIn('Recipient address rejected: Access denied', msg) | |
| self.assertEqual(code, 554) | |
| def test_authenticated_only_recipients_logged_in(self): | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| smtp.login('[email protected]', 'test') | |
| smtp.docmd('mail from: [email protected]') | |
| code, msg = smtp.docmd('rcpt to: [email protected]') | |
| self.assertIn('Ok', msg) | |
| self.assertEqual(code, 250) | |
| def test_invalid_client_hostname(self): | |
| smtp = self.smtp() | |
| smtp.ehlo('google.ch') | |
| smtp.docmd('mail from: [email protected]') | |
| code, msg = smtp.docmd('rcpt to: [email protected]') | |
| self.assertIn('Client host rejected', msg) | |
| self.assertEqual(code, 450) | |
| def test_invalid_recipient(self): | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| smtp.login('[email protected]', 'test') | |
| smtp.docmd('mail from: [email protected]') | |
| code, msg = smtp.docmd('rcpt to: [email protected]') | |
| self.assertIn('Recipient address rejected: User unknown', msg) | |
| self.assertEqual(code, 550) | |
| def test_non_fqdn_recipient(self): | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| smtp.login('[email protected]', 'test') | |
| smtp.docmd('mail from: [email protected]') | |
| code, msg = smtp.docmd('rcpt to: user') | |
| self.assertIn('need fully-qualified address', msg) | |
| self.assertEqual(code, 504) | |
| def test_unknown_recipient_domain(self): | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| smtp.login('[email protected]', 'test') | |
| smtp.docmd('mail from: [email protected]') | |
| code, msg = smtp.docmd('rcpt to: [email protected]') | |
| self.assertIn('Domain not found', msg) | |
| self.assertEqual(code, 450) | |
| def test_smtp_without_tls(self): | |
| smtp = self.smtp() | |
| smtp.ehlo(self.client) | |
| self.assertRaises( | |
| SMTPException, smtp.login, '[email protected]', 'test' | |
| ) | |
| def test_valid_send_ssl(self): | |
| smtp = self.smtp() | |
| smtp.ehlo(self.client) | |
| smtp.starttls() | |
| smtp.login('[email protected]', 'test') | |
| smtp.docmd('mail from: [email protected]') | |
| code, msg = smtp.docmd('rcpt to: [email protected]') | |
| self.assertIn('Ok', msg) | |
| self.assertEqual(code, 250) | |
| def test_disable_vrfy_command(self): | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| smtp.login('[email protected]', 'test') | |
| code, msg = smtp.verify('[email protected]') | |
| self.assertIn('VRFY command is disabled', msg) | |
| self.assertEqual(code, 502) | |
| def test_invalid_helos(self): | |
| smtp = self.smtp() | |
| code, msg = smtp.ehlo('mail.seantis.ch') | |
| self.assertIn("Don't use my hostname", msg) | |
| self.assertEqual(code, 554) | |
| code, msg = smtp.ehlo('iris.seantis.ch') | |
| self.assertIn("Don't use my hostname", msg) | |
| self.assertEqual(code, 554) | |
| code, msg = smtp.ehlo('[192.168.101.119]') | |
| self.assertIn("Don't use my IP address", msg) | |
| self.assertEqual(code, 554) | |
| code, msg = smtp.ehlo('192.168.101.119') | |
| self.assertIn('Your software is not RFC 2821 compliant', msg) | |
| self.assertEqual(code, 554) | |
| def test_sender_spoofing(self): | |
| # users may not send emails with the address of another user | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| smtp.login('[email protected]', 'test') | |
| code, msg = smtp.docmd('mail from: [email protected]') | |
| self.assertIn('Sender address rejected: not owned by user', msg) | |
| self.assertEqual(code, 553) | |
| # users which receive aliased mails may send emails through them | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| smtp.login('[email protected]', 'test') | |
| code, msg = smtp.docmd('mail from: [email protected]') | |
| self.assertIn('Ok', msg) | |
| self.assertEqual(code, 250) | |
| # [email protected] is allowed to send for the other users | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| smtp.login('[email protected]', 'test') | |
| code, msg = smtp.docmd('mail from: [email protected]') | |
| self.assertIn('Ok', msg) | |
| self.assertEqual(code, 250) | |
| # [email protected] should still be able to send for himself | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| smtp.login('[email protected]', 'test') | |
| code, msg = smtp.docmd('mail from: [email protected]') | |
| self.assertIn('Ok', msg) | |
| self.assertEqual(code, 250) | |
| def test_any_sender_does_not_receive(self): | |
| # the [email protected] may send as any other adress, but he will | |
| # never receive emails in these tests | |
| imbox = Imbox( | |
| self.server, | |
| username='[email protected]', | |
| password='test', | |
| ssl=True | |
| ) | |
| self.assertEqual(len(list(imbox.messages())), 0) | |
| def test_tls(self): | |
| smtp = self.smtp() | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| code, msg = smtp.login('[email protected]', 'test') | |
| self.assertIn('Authentication successful', msg) | |
| self.assertEqual(code, 235) | |
| def test_tls_forced_on_submission(self): | |
| # using submission on port 587 requires TLS | |
| smtp = self.smtp(port=587) | |
| smtp.ehlo(self.client) | |
| code, msg = smtp.docmd( | |
| 'AUTH PLAIN', self.encode_auth('[email protected]', 'test') | |
| ) | |
| self.assertIn('Must issue a STARTTLS command first', msg) | |
| self.assertEqual(code, 530) | |
| smtp.starttls() | |
| smtp.ehlo(self.client) | |
| code, msg = smtp.docmd( | |
| 'AUTH PLAIN', self.encode_auth('[email protected]', 'test') | |
| ) | |
| self.assertIn('Authentication successful', msg) | |
| self.assertEqual(code, 235) | |
| def test_imap_auth(self): | |
| imbox = Imbox( | |
| self.server, | |
| username='[email protected]', | |
| password='test', | |
| ssl=True | |
| ) | |
| self.assertTrue(imbox.connection) | |
| def test_imap_auth_no_plain(self): | |
| try: | |
| Imbox( | |
| self.server, | |
| username='[email protected]', | |
| password='test', | |
| ssl=False | |
| ) | |
| except Exception, e: | |
| self.assertIn('Plaintext authentication disallowed', e.message) | |
| else: | |
| assert False, "Should have failed" | |
| def test_imap_auth_starttls(self): | |
| connection = IMAP4(self.server, port=143) | |
| msg, result = connection.starttls() | |
| self.assertEqual(msg, 'OK') | |
| msg, result = connection.login('[email protected]', 'test') | |
| self.assertEqual(msg, 'OK') | |
| self.assertEqual(result, ['Logged in']) | |
| msg, result = connection.list() | |
| self.assertEqual(msg, 'OK') | |
| def test_imap_receive_email(self): | |
| # use a random text to find the right message again | |
| text = uuid4().hex | |
| self.send_mail( | |
| sender='[email protected]', | |
| recipient='[email protected]', | |
| subject=text, | |
| body=text | |
| ) | |
| self.wait_until_processed() | |
| # both user and root should get the aliased email | |
| for user in ('[email protected]', '[email protected]'): | |
| imbox = Imbox( | |
| self.server, | |
| username=user, | |
| password='test', | |
| ssl=True | |
| ) | |
| messages = [m for m in imbox.messages() if m[1].subject == text] | |
| self.assertEqual(len(messages), 1) | |
| def test_pop3_receive_email(self): | |
| # use a random text to find the right message again | |
| text = uuid4().hex | |
| self.send_mail( | |
| sender='[email protected]', | |
| recipient='[email protected]', | |
| subject=text, | |
| body=text | |
| ) | |
| self.wait_until_processed() | |
| pop = poplib.POP3_SSL(self.server) | |
| try: | |
| pop.user('[email protected]') | |
| pop.pass_('test') | |
| found_emails = 0 | |
| for i in range(len(pop.list()[1])): | |
| for j in pop.retr(i+1)[1]: | |
| if text in j and 'Subject' in j: | |
| found_emails += 1 | |
| self.assertEqual(found_emails, 1) | |
| finally: | |
| pop.quit() # unlock mailbox | |
| def test_pop3_auth_no_plain(self): | |
| # we can only really check if no plain text auth works with pop3, | |
| # not if starttls works, because python's library does not support that | |
| # but we do care more about plaintext not working anyway | |
| pop = poplib.POP3(self.server) | |
| try: | |
| pop.user('[email protected]') | |
| except Exception, e: | |
| self.assertIn('Plaintext authentication disallowed', e.message) | |
| else: | |
| assert False, "Exception should have occured" | |
| def test_outbound_policy(self): | |
| # policyd's outbound policy restricts the email by counting upwards | |
| # with each new mail, whilst counting downwards over time which is | |
| # why the code below is able to send 4 messages even though the limit | |
| # is 3 messages in 3 seconds. | |
| def send(): | |
| return self.send_mail( | |
| sender='[email protected]', | |
| user='[email protected]', | |
| recipient='[email protected]', | |
| subject='test', | |
| body='test' | |
| ) | |
| # the limit will kick in anywhere between 3 and 6 messages depending | |
| # on how fast the test can be run / how many times they are re-run | |
| # in a short period of time | |
| try: | |
| for i in range(0, 6): | |
| send() | |
| except SMTPRecipientsRefused, e: | |
| code, msg = e.recipients['[email protected]'] | |
| else: | |
| code, msg = None, None | |
| self.assertNotEqual(code, None) | |
| self.assertNotEqual(msg, None) | |
| self.assertIn('Policy rejection; Message count quota exceeded', msg) | |
| self.assertEqual(code, 450) | |
| def test_send_virus_signature(self): | |
| # send the EICAR virus test signature to ensure that the | |
| # email is found to have a virus and is in effect dropped silently | |
| # see http://www.eicar.org/86-0-Intended-use.html | |
| signature = ( | |
| 'X5O!P%@AP[4\PZX54(P^)7CC)7}$' | |
| 'EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*' | |
| ) | |
| text = uuid4().hex | |
| self.send_mail( | |
| sender='[email protected]', | |
| recipient='[email protected]', | |
| subject=text, | |
| body=text, | |
| attachments=[signature] | |
| ) | |
| self.wait_until_processed() | |
| # the mail should have been dropped | |
| imbox = Imbox( | |
| self.server, | |
| username='[email protected]', | |
| password='test', | |
| ssl=True | |
| ) | |
| messages = [m for m in imbox.messages() if m[1].subject == text] | |
| self.assertEqual(len(messages), 0) | |
| def test_default_imap_folders(self): | |
| # see 10-mail.conf.erb | |
| default_folders = ['Drafts', 'Sent', 'Spam', 'Trash', 'Archive'] | |
| imbox = Imbox( | |
| self.server, | |
| username='[email protected]', | |
| password='test', | |
| ssl=True | |
| ) | |
| server = imbox.server.list_folders()[1] | |
| for folder in default_folders: | |
| self.assertEqual(len([f for f in server if folder in f]), 1) | |
| def test_spam_blacklist(self): | |
| text = uuid4().hex | |
| self.send_mail( | |
| sender='[email protected]', | |
| recipient='[email protected]', | |
| subject=text, | |
| body='', | |
| user='[email protected]', | |
| pw='test' | |
| ) | |
| self.wait_until_processed() | |
| imbox = Imbox( | |
| self.server, | |
| username='[email protected]', | |
| password='test', | |
| ssl=True | |
| ) | |
| # it should not be in the normal folder | |
| messages = [m for m in imbox.messages() if m[1].subject == text] | |
| self.assertEqual(len(messages), 0) | |
| # it will be in the spam folder | |
| messages = [ | |
| m for m in imbox.messages(folder='Spam') if m[1].subject == text | |
| ] | |
| self.assertEqual(len(messages), 1) | |
| headers = messages[0][1].headers | |
| headers = dict((h['Name'], h['Value']) for h in headers) | |
| self.assertTrue(headers['X-Spam-Status'].startswith('Yes')) | |
| def test_receive_spam(self): | |
| # send the GTUBE spam test signature and ensure that the email | |
| # headers reflect the fact that it's spam | |
| signature = ( | |
| 'XJS*C4JDBQADN1.NSBN3*2IDNEN*' | |
| 'GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X' | |
| ) | |
| text = uuid4().hex | |
| self.send_mail( | |
| sender='[email protected]', | |
| recipient='[email protected]', | |
| subject=text, | |
| body=signature | |
| ) | |
| self.wait_until_processed() | |
| imbox = Imbox( | |
| self.server, | |
| username='[email protected]', | |
| password='test', | |
| ssl=True | |
| ) | |
| # it should not be in the normal folder | |
| messages = [m for m in imbox.messages() if m[1].subject == text] | |
| self.assertEqual(len(messages), 0) | |
| # it will be in the spam folder | |
| messages = [ | |
| m for m in imbox.messages(folder='Spam') if m[1].subject == text | |
| ] | |
| self.assertEqual(len(messages), 1) | |
| headers = messages[0][1].headers | |
| headers = dict((h['Name'], h['Value']) for h in headers) | |
| self.assertTrue(headers['X-Spam-Status'].startswith('Yes')) | |
| def test_manage_sieve(self): | |
| client = managesieve.Client(self.server) | |
| # tls only | |
| result = client.connect('[email protected]', 'test', starttls=False) | |
| self.assertFalse(result) | |
| result = client.connect('[email protected]', 'test', starttls=True) | |
| self.assertTrue(result) | |
| token = uuid4().hex | |
| script = textwrap.dedent(""" | |
| require "body"; | |
| require "reject"; | |
| if body :contains "%s" { | |
| reject "please do not send me anymore e-mails"; | |
| } | |
| """ % token) | |
| try: | |
| script_put = client.putscript('test', script) | |
| client.setactive('test') | |
| self.assertTrue(script_put, "sieve script syntax error") | |
| self.send_mail( | |
| sender='[email protected]', | |
| user='[email protected]', | |
| recipient='[email protected]', | |
| subject=token, | |
| body=token | |
| ) | |
| self.wait_until_processed() | |
| # rejecting the email means that it's sent back to the user | |
| # (it's not actually rejected during the sending, because | |
| # that would obviously be way too slow) | |
| imbox = Imbox( | |
| self.server, | |
| username='[email protected]', | |
| password='test', | |
| ssl=True | |
| ) | |
| messages = [m for m in imbox.messages() if token in m[1].subject] | |
| self.assertEqual(len(messages), 1) | |
| self.assertEqual('Rejected: {}'.format(token), m[1].subject) | |
| self.assertEqual( | |
| m[1].sent_from[0]['email'], '[email protected]' | |
| ) | |
| # imbox doesn't handle multipart/report messages well so | |
| # we need to peek into the raw message | |
| raw = imbox.connection.uid('fetch', m[0], '(BODY.PEEK[])')[1][0][1] | |
| self.assertIn('please do not send me anymore e-mails', raw) | |
| finally: | |
| client.deletescript('test') | |
| if __name__ == '__main__': | |
| unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment