Created
January 10, 2019 15:01
-
-
Save belyaev-pa/126a249a413c17ee5c0adda5327c521b to your computer and use it in GitHub Desktop.
email filter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# !/usr/bin/python | |
# -*- coding: utf-8 -*- | |
from email.parser import Parser | |
import syslog | |
import email | |
import base64 | |
import ldap | |
import sys | |
import argparse | |
import os | |
import subprocess | |
import time | |
# WHITE_LIST_TYPE = 'file' #'json' | |
ENCODING = 'utf-8' | |
LDAP_ALIASES = '/etc/postfix/ldap_aliases.cf' | |
# LDAP_ADDR = '10.32.99.163' | |
# BASEDN = 'CN=accounts, DC=che1, DC=ru' | |
NAME_ATTR = 'gecos' | |
WHITE_LIST_PATH = '/opt/white_list' | |
SEND_COMMAND = '/usr/sbin/sendmail -G -i' | |
# results codes | |
EX_OK = 0 | |
EX_TEMPFAIL = 75 | |
EX_UNAVAILABLE = 69 | |
def get_ldap_host_and_dn(): | |
"""Parse ldap config from LDAP_ALIASES file | |
:return: host string and base dn | |
""" | |
host = None | |
base_dn = None | |
if not os.path.exists(LDAP_ALIASES): | |
syslog.syslog(syslog.LOG_INFO, 'Error: File ldap_aliases not found') | |
return None, None | |
with open(LDAP_ALIASES, 'r') as ldap_conf: | |
for line in ldap_conf: | |
param, tune_up = line.split('=', 1) | |
if param.strip() == 'server_host': | |
host = tune_up.strip() | |
if param.strip() == 'search_base': | |
base_dn = tune_up.strip() | |
return host, base_dn | |
def get_name_from_ldap(finding_email): | |
"""Get name from LDAP for requested email | |
:param finding_email: | |
:return: EX_TEMPFAIL, None | if no config file or parameters not found | |
1, None | if email not found in ldap | |
0, 'base64 encoded string with name' | if all OK | |
""" | |
host, basedn = get_ldap_host_and_dn() | |
if (host is None) or (basedn is None): | |
syslog.syslog(syslog.LOG_INFO, 'Error: File ldap_aliases not found or have no server_host/search_base param') | |
return EX_TEMPFAIL, None | |
ad = ldap.initialize('ldap://' + host) | |
ad.simple_bind_s() # here we need to provide user and psswd | |
scope = ldap.SCOPE_SUBTREE | |
filter_exp = 'mail={}'.format(finding_email) | |
attr_list = [NAME_ATTR] | |
result = ad.search_s(basedn, scope, filter_exp, attr_list) | |
syslog.syslog(syslog.LOG_INFO, 'result: {0}, {1}'.format(result, type(result))) | |
if not result: | |
syslog.syslog(syslog.LOG_INFO, 'Error: Email {0} not found in LDAP'.format(email)) | |
return 1, None | |
else: | |
s = result[0][1][NAME_ATTR][0] | |
syslog.syslog(syslog.LOG_INFO, 's: {0}'.format(s)) | |
# print('s = {0}'.format(s)); | |
# return base64.b64encode(s.encode(ENCODING)) | |
return 0, base64.b64encode(s) | |
def check_white_list(mail_from, mail_to_list): | |
""" | |
check email with white list file | |
can user send email to following emails | |
:param mail_from: email from address - string | |
:param mail_to_list: email to addresses - list | |
:return: result code | |
""" | |
err_count = 0 | |
if not os.path.exists(WHITE_LIST_PATH): | |
syslog.syslog(syslog.LOG_INFO, 'Error: File white_list not found') | |
return EX_TEMPFAIL | |
# TODO: make parse json instead of file | |
with open(WHITE_LIST_PATH, 'r') as white_list: | |
for line in white_list: | |
line = line.split(' ', 1) | |
if mail_from == line[0]: | |
allowed_to_list = line[1].split(',') | |
for mail_to_addr in mail_to_list: | |
if mail_to_addr not in allowed_to_list: | |
syslog.syslog(syslog.LOG_INFO, 'Deny from {0} to {1}'.format(mail_from, mail_to_addr)) | |
print('Deny from {0} to {1}'.format(mail_from, mail_to_addr)) | |
err_count += 1 | |
if err_count: | |
return EX_UNAVAILABLE | |
return EX_OK | |
def send_email(message, mail_from, mail_to): | |
"""send mail with initiate subprocess with SEND_COMMAND | |
:param message: message as sting with email.parser example *msg.as_string(unixfrom=True)* | |
:param mail_from: address from - string | |
:param mail_to: addresses to - string ' ' space separated | |
:return: None | |
""" | |
cmd_str = SEND_COMMAND + ' -f ' + mail_from + ' -- ' + mail_to | |
proc = subprocess.Popen([cmd_str], shell=True, stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
# proc.communicate(input=message.encode("utf-8")) | |
proc.communicate(input=message) | |
result = proc.wait() | |
syslog.syslog(syslog.LOG_INFO, 'Result of sendmail: {0}.'.format(result)) | |
def header_handler(msg, field_name): | |
"""Handle parsed message attributes | |
:param msg: Parsed message with email.parsec Parser | |
:param field_name: name of handling field with email 'From', 'To' for example | |
:return: result code | |
""" | |
field = msg.get(field_name) | |
if field is not None: | |
addresses = email.utils.getaddresses([field]) | |
new_field = '' | |
for addr in addresses: | |
ldap_res = get_name_from_ldap(addr[1]) | |
if ldap_res[0] == 0: | |
name = str(ldap_res[1]).lstrip('b').strip('\'') | |
new_field += '=?' + ENCODING + '?B?' + name + '?=' + ' <' + addr[1] + '>, ' | |
elif ldap_res[0] == 1: | |
new_field += addr[1] + ', ' | |
else: | |
return ldap_res[0] | |
msg.replace_header(field_name, new_field) | |
return 0 | |
def main(message, mail_from, mail_to_list): | |
"""handling algorithm: | |
1) Check mails in white list | |
2) Replace To field | |
3) Replace Cc field | |
4) Replace From field | |
5) Send mail | |
:param message: full message as file | |
:param mail_from: parameter 'from' from console command | |
:param mail_to_list: parameter 'to' from console command | |
:return: result code | |
""" | |
if (mail_from is None) or (not mail_to_list): | |
syslog.syslog(syslog.LOG_INFO, 'Error: mail_from({0}) or mail_to({1}) is empty'.format(mail_from, mail_to)) | |
return EX_UNAVAILABLE | |
if check_white_list(mail_from, mail_to_list) != EX_OK: | |
return EX_UNAVAILABLE | |
email_parser = Parser() | |
msg = email_parser.parsestr(message.read()) | |
result = header_handler(msg, 'To') | |
if result == EX_TEMPFAIL: | |
return EX_TEMPFAIL | |
result = header_handler(msg, 'Cc') | |
if result == EX_TEMPFAIL: | |
return EX_TEMPFAIL | |
result = header_handler(msg, 'From') | |
if result == EX_TEMPFAIL: | |
return EX_TEMPFAIL | |
send_email(msg.as_string(unixfrom=True), mail_from, ' '.join(mail_to_list)) | |
return EX_OK | |
if __name__ == '__main__': | |
syslog.openlog('mail_filter') | |
syslog.syslog(syslog.LOG_INFO, 'Start filter.py ') | |
syslog.syslog(syslog.LOG_INFO, 'Full args: {0}'.format(' '.join(sys.argv))) | |
std_from, std_to, std_in = sys.argv[2], sys.argv[4:], sys.stdin | |
syslog.syslog(syslog.LOG_INFO, 'From: {0}'.format(std_from)) | |
syslog.syslog(syslog.LOG_INFO, 'To: {0}'.format(std_to)) | |
res = main(std_in, std_from, std_to) | |
syslog.syslog(syslog.LOG_INFO, 'Stop filter.py. Result: {0}'.format(res)) | |
syslog.closelog() | |
sys.exit(res) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment