Created
February 21, 2020 12:19
-
-
Save K0rt3x3rr0r/72b29855618eb92be7ab5812f55819d8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import ldap | |
import ldap3 | |
import re | |
import psycopg2 | |
import base64 | |
from ldap3 import Server, Connection, ALL | |
from flask import current_app, jsonify, request | |
from flask_cors import cross_origin | |
from alerta.auth.utils import create_token, get_customers | |
from alerta.exceptions import ApiError | |
from alerta.models.permission import Permission | |
from alerta.models.user import User | |
from alerta.utils.audit import auth_audit_trail | |
from . import auth | |
@auth.route('/auth/login', methods=['OPTIONS', 'POST']) | |
@cross_origin(supports_credentials=True) | |
def login(): | |
# Allow LDAP server to use a self signed certificate | |
if current_app.config['LDAP_ALLOW_SELF_SIGNED_CERT']: | |
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW) | |
# Retrieve required fields from client request | |
try: | |
login = request.json.get('username', None) or request.json['email'] | |
password = request.json['password'] | |
if password == '' or password == None: | |
raise ApiError("invalid username or password 1", 401) | |
except KeyError: | |
raise ApiError("must supply 'username' and 'password'", 401) | |
if '\\' in login: | |
domain, username = login.split('\\') | |
email = '' | |
email_verified = False | |
else: | |
username = login | |
email = login + 'mydomain.com' | |
email_verified = True | |
try: | |
trace_level = 2 if current_app.debug else 0 | |
server = Server(current_app.config['LDAP_URL'], use_ssl=True, get_info=ALL) | |
ldap_connection = Connection(server, user=email, password=password, auto_bind=True, version=3, authentication='SIMPLE', client_strategy='SYNC', auto_referrals=True, check_names=True, read_only=False, lazy=False, raise_exceptions=False) | |
ldap_connection | |
if password == '' or password == None: | |
raise ApiError("Incorrect login or password - LDAP connection failed", 401) | |
except ldap.INVALID_CREDENTIALS: | |
raise ApiError('invalid username or password', 401) | |
except Exception as e: | |
raise ApiError(str(e), 500) | |
# casefold () allows authentication not to be case sensitive | |
login = login.casefold() | |
email = email.casefold() | |
user = User.find_by_username(username=login) | |
# if user exist | |
if user: | |
if user.roles == ['myFirstRoleGroup'] or user.roles == ['mySecondRoleGroup'] or user.roles == ['myThirdRoleGroup']: | |
#user.update_last_login() | |
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT']) | |
cur = conn.cursor() | |
postgres_select_customer = """SELECT customer, customer1 FROM users WHERE login='%s'""" | |
print(cur.execute(postgres_select_customer % login)) | |
resultat = cur.fetchone() | |
print(resultat) | |
groups = list() | |
groups.append(resultat[0]) | |
groups.append(resultat[1]) | |
conn.commit() | |
conn.close() | |
scopes = Permission.lookup(login=login, roles=user.roles) | |
customers = get_customers(login=login, groups=[user.domain] + groups) | |
auth_audit_trail.send(current_app._get_current_object(), event='basic-ldap-login', message='user login via LDAP', | |
user=login, customers=customers, scopes=scopes, resource_id=user.id, type='user', | |
request=request) | |
# Generate token | |
token = create_token(user_id=user.id, name=user.name, login=user.email, provider='ldap', customers=customers, | |
scopes=scopes, roles=user.roles, email=user.email, email_verified=user.email_verified) | |
return jsonify(token=token.tokenize) | |
# Connection for other users | |
else: | |
user.update_last_login() | |
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT']) | |
cur = conn.cursor() | |
postgres_select_customer = """SELECT customer FROM users WHERE name='%s'""" | |
cur.execute(postgres_select_customer % login) | |
resultat = cur.fetchone() | |
groups = list() | |
groups.append(resultat) | |
print(groups) | |
conn.commit() | |
conn.close() | |
scopes = Permission.lookup(login=login, roles=user.roles) | |
customers = get_customers(login=login, groups=[user.domain] + groups) | |
auth_audit_trail.send(current_app._get_current_object(), event='basic-ldap-login', message='user login via LDAP', | |
user=login, customers=customers, scopes=scopes, resource_id=user.id, type='user', | |
request=request) | |
# Generate token | |
token = create_token(user_id=user.id, name=user.name, login=user.email, provider='ldap', customers=customers, | |
scopes=scopes, roles=user.roles, email=user.email, email_verified=user.email_verified) | |
return jsonify(token=token.tokenize) | |
# Creation of the user and assignment of the customer and roles | |
if not user: | |
# Check in the AD if we find the user | |
binddn = current_app.config['LDAP_BINDDN'] | |
pw = current_app.config['LDAP_PW'] | |
server_uri = current_app.config['LDAP_URL'] | |
search_base = current_app.config['LDAP_BASE'] | |
search_filter = current_app.config['LDAP_FILTER'] | |
attrs = current_app.config['LDAP_ATTRS'] | |
server = ldap3.Server(server_uri) | |
with ldap3.Connection(server, binddn, pw) as conn: | |
i = -1 | |
while True: | |
i = i +1 | |
conn.search(search_base, search_filter, attributes=attrs) | |
usercndn = conn.entries | |
test = str(usercndn[i][current_app.config['LDAP_ATTRS_DN']]) | |
finduser = re.search(r"(.*)" + re.escape(username) + r"(.*)", test, re.I) | |
# if the username corresponds to a user who is a member of one of the GG groups entered in the search, we will find out which group he belongs to. Then we will assign the customer who is doing well according to the group. | |
if finduser: | |
userdn = usercndn[i][current_app.config['LDAP_ATTRS_DN']] | |
userdn = str(userdn) | |
LDAP_GROUPSDN = print(usercndn[i][current_app.config['LDAP_ATTRS_MEMBEROF']]) | |
groupsldap = str(usercndn[i][current_app.config['LDAP_ATTRS_MEMBEROF']]) | |
findgroup1 = re.search(r"(.*)myOneGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup2 = re.search(r"(.*)myTwoGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup3 = re.search(r"(.*)myThreeGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup15 = re.search(r"(.*)myFourGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup4 = re.search(r"(.*)myFiveGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup5 = re.search(r"(.*)mySixGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup6 = re.search(r"(.*)mySevenGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup7 = re.search(r"(.*)myEightGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup8 = re.search(r"(.*)myNineGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup9 = re.search(r"(.*)myTenGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup10 = re.search(r"(.*)myElevenGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup11 = re.search(r"(.*)myTwelveGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup12 = re.search(r"(.*)myThirteenGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup13 = re.search(r"(.*)myFourteenGroupAD(.*)", groupsldap, re.M|re.I) | |
findgroup14 = re.search(r"(.*)myFiveteenGroupAD(.*)", groupsldap, re.M|re.I) | |
break | |
if findgroup1 or findgroup2 or findgroup3: | |
user = User(name=username, login=login, password='' ,email=email, | |
roles=['myFirstRoleGroup'], text='LDAP test user', email_verified=email_verified) | |
user = user.create() | |
groups = list() | |
groups.append('CustomerGroup1') | |
groups.append('CustomerGroup2') | |
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT']) | |
cur = conn.cursor() | |
postgres_update_query = """UPDATE users SET customer='CustomerGroup1', customer1='CustomerGroup2' WHERE login='%s'""" | |
cur.execute(postgres_update_query % login) | |
postgres_select_customer = """SELECT customer, customer1 FROM users WHERE login='%s'""" | |
customerreel = cur.execute(postgres_select_customer % login) | |
resultat = cur.fetchone() | |
print(resultat) | |
conn.commit() | |
conn.close() | |
if findgroup4 or findgroup5 or findgroup6 or findgroup7 or findgroup8: | |
user = User(name=username, login=login, password='' ,email=email, | |
roles=['mySecondRoleGroup'], text='LDAP user', email_verified=email_verified) | |
user = user.create() | |
groups = list() | |
groups.append('CustomerGroup1') | |
groups.append('CustomerGroup2') | |
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT']) | |
cur = conn.cursor() | |
postgres_update_query = """UPDATE users SET customer='CustomerGroup1', customer1='CustomerGroup2' WHERE login='%s'""" | |
cur.execute(postgres_update_query % login) | |
postgres_select_customer = """SELECT customer, customer1 FROM users WHERE login='%s'""" | |
customerreel = cur.execute(postgres_select_customer % login) | |
resultat = cur.fetchone() | |
print(resultat) | |
conn.commit() | |
conn.close() | |
if findgroup9 or findgroup10 or findgroup15: | |
user = User(name=username, login=login, password='', email=email, | |
roles=['myThirdRoleGroup'], text='LDAP user', email_verified=email_verified) | |
user = user.create() | |
groups = list() | |
groups.append('CustomerGroup1') | |
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT']) | |
cur = conn.cursor() | |
postgres_update_query = """UPDATE users SET customer='CustomerGroup1' WHERE login='%s'""" | |
cur.execute(postgres_update_query % login) | |
postgres_select_customer = """SELECT customer FROM users WHERE login='%s'""" | |
customerreel = cur.execute(postgres_select_customer % login) | |
resultat = cur.fetchone() | |
print(resultat) | |
conn.commit() | |
conn.close() | |
if findgroup11: | |
user = User(name=username, login=login, password='', email=email, | |
roles=['myFourRoleGroup'], text='LDAP user', email_verified=email_verified) | |
user = user.create() | |
groups = list() | |
groups.append('CustomerGroup1') | |
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT']) | |
cur = conn.cursor() | |
postgres_update_query = """UPDATE users SET customer='CustomerGroup1' WHERE login='%s'""" | |
cur.execute(postgres_update_query % login) | |
postgres_select_customer = """SELECT customer FROM users WHERE login='%s'""" | |
customerreel = cur.execute(postgres_select_customer % login) | |
resultat = cur.fetchone() | |
print(resultat) | |
conn.commit() | |
conn.close() | |
if findgroup12 or findgroup13 or findgroup14: | |
user = User(name=username, login=login, password='', email=email, | |
roles=['myFiveRoleGroup'], text='LDAP user', email_verified=email_verified) | |
user = user.create() | |
groups = list() | |
groups.append('CustomerGroup') | |
groups.append('CustomerGroup1') | |
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT']) | |
cur = conn.cursor() | |
postgres_update_query = """UPDATE users SET customer='CustomerGroup', customer1='CustomerGroup1' WHERE login='%s'""" | |
cur.execute(postgres_update_query % login) | |
postgres_select_customer = """SELECT customer, customer1 FROM users WHERE login='%s'""" | |
customerreel = cur.execute(postgres_select_customer % login) | |
resultat = cur.fetchone() | |
print(resultat) | |
conn.commit() | |
conn.close() | |
scopes = Permission.lookup(login=login, roles=user.roles) | |
customers = get_customers(login=login, groups=[user.domain] + groups) | |
auth_audit_trail.send(current_app._get_current_object(), event='basic-ldap-login', message='user login via LDAP', | |
user=login, customers=customers, scopes=scopes, resource_id=user.id, type='user', | |
request=request) | |
# Generate token | |
token = create_token(user_id=user.id, name=user.name, login=user.email, provider='ldap', customers=customers, | |
scopes=scopes, roles=user.roles, email=user.email, email_verified=user.email_verified) | |
return jsonify(token=token.tokenize) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment