Skip to content

Instantly share code, notes, and snippets.

@K0rt3x3rr0r
Created February 21, 2020 12:19
Show Gist options
  • Save K0rt3x3rr0r/72b29855618eb92be7ab5812f55819d8 to your computer and use it in GitHub Desktop.
Save K0rt3x3rr0r/72b29855618eb92be7ab5812f55819d8 to your computer and use it in GitHub Desktop.
import sys
import ldap
import ldap3
import re
import psycopg2
import base64
from ldap3 import Server, Connection, ALL
from flask import current_app, jsonify, request
from flask_cors import cross_origin
from alerta.auth.utils import create_token, get_customers
from alerta.exceptions import ApiError
from alerta.models.permission import Permission
from alerta.models.user import User
from alerta.utils.audit import auth_audit_trail
from . import auth
@auth.route('/auth/login', methods=['OPTIONS', 'POST'])
@cross_origin(supports_credentials=True)
def login():
# Allow LDAP server to use a self signed certificate
if current_app.config['LDAP_ALLOW_SELF_SIGNED_CERT']:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW)
# Retrieve required fields from client request
try:
login = request.json.get('username', None) or request.json['email']
password = request.json['password']
if password == '' or password == None:
raise ApiError("invalid username or password 1", 401)
except KeyError:
raise ApiError("must supply 'username' and 'password'", 401)
if '\\' in login:
domain, username = login.split('\\')
email = ''
email_verified = False
else:
username = login
email = login + 'mydomain.com'
email_verified = True
try:
trace_level = 2 if current_app.debug else 0
server = Server(current_app.config['LDAP_URL'], use_ssl=True, get_info=ALL)
ldap_connection = Connection(server, user=email, password=password, auto_bind=True, version=3, authentication='SIMPLE', client_strategy='SYNC', auto_referrals=True, check_names=True, read_only=False, lazy=False, raise_exceptions=False)
ldap_connection
if password == '' or password == None:
raise ApiError("Incorrect login or password - LDAP connection failed", 401)
except ldap.INVALID_CREDENTIALS:
raise ApiError('invalid username or password', 401)
except Exception as e:
raise ApiError(str(e), 500)
# casefold () allows authentication not to be case sensitive
login = login.casefold()
email = email.casefold()
user = User.find_by_username(username=login)
# if user exist
if user:
if user.roles == ['myFirstRoleGroup'] or user.roles == ['mySecondRoleGroup'] or user.roles == ['myThirdRoleGroup']:
#user.update_last_login()
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT'])
cur = conn.cursor()
postgres_select_customer = """SELECT customer, customer1 FROM users WHERE login='%s'"""
print(cur.execute(postgres_select_customer % login))
resultat = cur.fetchone()
print(resultat)
groups = list()
groups.append(resultat[0])
groups.append(resultat[1])
conn.commit()
conn.close()
scopes = Permission.lookup(login=login, roles=user.roles)
customers = get_customers(login=login, groups=[user.domain] + groups)
auth_audit_trail.send(current_app._get_current_object(), event='basic-ldap-login', message='user login via LDAP',
user=login, customers=customers, scopes=scopes, resource_id=user.id, type='user',
request=request)
# Generate token
token = create_token(user_id=user.id, name=user.name, login=user.email, provider='ldap', customers=customers,
scopes=scopes, roles=user.roles, email=user.email, email_verified=user.email_verified)
return jsonify(token=token.tokenize)
# Connection for other users
else:
user.update_last_login()
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT'])
cur = conn.cursor()
postgres_select_customer = """SELECT customer FROM users WHERE name='%s'"""
cur.execute(postgres_select_customer % login)
resultat = cur.fetchone()
groups = list()
groups.append(resultat)
print(groups)
conn.commit()
conn.close()
scopes = Permission.lookup(login=login, roles=user.roles)
customers = get_customers(login=login, groups=[user.domain] + groups)
auth_audit_trail.send(current_app._get_current_object(), event='basic-ldap-login', message='user login via LDAP',
user=login, customers=customers, scopes=scopes, resource_id=user.id, type='user',
request=request)
# Generate token
token = create_token(user_id=user.id, name=user.name, login=user.email, provider='ldap', customers=customers,
scopes=scopes, roles=user.roles, email=user.email, email_verified=user.email_verified)
return jsonify(token=token.tokenize)
# Creation of the user and assignment of the customer and roles
if not user:
# Check in the AD if we find the user
binddn = current_app.config['LDAP_BINDDN']
pw = current_app.config['LDAP_PW']
server_uri = current_app.config['LDAP_URL']
search_base = current_app.config['LDAP_BASE']
search_filter = current_app.config['LDAP_FILTER']
attrs = current_app.config['LDAP_ATTRS']
server = ldap3.Server(server_uri)
with ldap3.Connection(server, binddn, pw) as conn:
i = -1
while True:
i = i +1
conn.search(search_base, search_filter, attributes=attrs)
usercndn = conn.entries
test = str(usercndn[i][current_app.config['LDAP_ATTRS_DN']])
finduser = re.search(r"(.*)" + re.escape(username) + r"(.*)", test, re.I)
# if the username corresponds to a user who is a member of one of the GG groups entered in the search, we will find out which group he belongs to. Then we will assign the customer who is doing well according to the group.
if finduser:
userdn = usercndn[i][current_app.config['LDAP_ATTRS_DN']]
userdn = str(userdn)
LDAP_GROUPSDN = print(usercndn[i][current_app.config['LDAP_ATTRS_MEMBEROF']])
groupsldap = str(usercndn[i][current_app.config['LDAP_ATTRS_MEMBEROF']])
findgroup1 = re.search(r"(.*)myOneGroupAD(.*)", groupsldap, re.M|re.I)
findgroup2 = re.search(r"(.*)myTwoGroupAD(.*)", groupsldap, re.M|re.I)
findgroup3 = re.search(r"(.*)myThreeGroupAD(.*)", groupsldap, re.M|re.I)
findgroup15 = re.search(r"(.*)myFourGroupAD(.*)", groupsldap, re.M|re.I)
findgroup4 = re.search(r"(.*)myFiveGroupAD(.*)", groupsldap, re.M|re.I)
findgroup5 = re.search(r"(.*)mySixGroupAD(.*)", groupsldap, re.M|re.I)
findgroup6 = re.search(r"(.*)mySevenGroupAD(.*)", groupsldap, re.M|re.I)
findgroup7 = re.search(r"(.*)myEightGroupAD(.*)", groupsldap, re.M|re.I)
findgroup8 = re.search(r"(.*)myNineGroupAD(.*)", groupsldap, re.M|re.I)
findgroup9 = re.search(r"(.*)myTenGroupAD(.*)", groupsldap, re.M|re.I)
findgroup10 = re.search(r"(.*)myElevenGroupAD(.*)", groupsldap, re.M|re.I)
findgroup11 = re.search(r"(.*)myTwelveGroupAD(.*)", groupsldap, re.M|re.I)
findgroup12 = re.search(r"(.*)myThirteenGroupAD(.*)", groupsldap, re.M|re.I)
findgroup13 = re.search(r"(.*)myFourteenGroupAD(.*)", groupsldap, re.M|re.I)
findgroup14 = re.search(r"(.*)myFiveteenGroupAD(.*)", groupsldap, re.M|re.I)
break
if findgroup1 or findgroup2 or findgroup3:
user = User(name=username, login=login, password='' ,email=email,
roles=['myFirstRoleGroup'], text='LDAP test user', email_verified=email_verified)
user = user.create()
groups = list()
groups.append('CustomerGroup1')
groups.append('CustomerGroup2')
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT'])
cur = conn.cursor()
postgres_update_query = """UPDATE users SET customer='CustomerGroup1', customer1='CustomerGroup2' WHERE login='%s'"""
cur.execute(postgres_update_query % login)
postgres_select_customer = """SELECT customer, customer1 FROM users WHERE login='%s'"""
customerreel = cur.execute(postgres_select_customer % login)
resultat = cur.fetchone()
print(resultat)
conn.commit()
conn.close()
if findgroup4 or findgroup5 or findgroup6 or findgroup7 or findgroup8:
user = User(name=username, login=login, password='' ,email=email,
roles=['mySecondRoleGroup'], text='LDAP user', email_verified=email_verified)
user = user.create()
groups = list()
groups.append('CustomerGroup1')
groups.append('CustomerGroup2')
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT'])
cur = conn.cursor()
postgres_update_query = """UPDATE users SET customer='CustomerGroup1', customer1='CustomerGroup2' WHERE login='%s'"""
cur.execute(postgres_update_query % login)
postgres_select_customer = """SELECT customer, customer1 FROM users WHERE login='%s'"""
customerreel = cur.execute(postgres_select_customer % login)
resultat = cur.fetchone()
print(resultat)
conn.commit()
conn.close()
if findgroup9 or findgroup10 or findgroup15:
user = User(name=username, login=login, password='', email=email,
roles=['myThirdRoleGroup'], text='LDAP user', email_verified=email_verified)
user = user.create()
groups = list()
groups.append('CustomerGroup1')
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT'])
cur = conn.cursor()
postgres_update_query = """UPDATE users SET customer='CustomerGroup1' WHERE login='%s'"""
cur.execute(postgres_update_query % login)
postgres_select_customer = """SELECT customer FROM users WHERE login='%s'"""
customerreel = cur.execute(postgres_select_customer % login)
resultat = cur.fetchone()
print(resultat)
conn.commit()
conn.close()
if findgroup11:
user = User(name=username, login=login, password='', email=email,
roles=['myFourRoleGroup'], text='LDAP user', email_verified=email_verified)
user = user.create()
groups = list()
groups.append('CustomerGroup1')
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT'])
cur = conn.cursor()
postgres_update_query = """UPDATE users SET customer='CustomerGroup1' WHERE login='%s'"""
cur.execute(postgres_update_query % login)
postgres_select_customer = """SELECT customer FROM users WHERE login='%s'"""
customerreel = cur.execute(postgres_select_customer % login)
resultat = cur.fetchone()
print(resultat)
conn.commit()
conn.close()
if findgroup12 or findgroup13 or findgroup14:
user = User(name=username, login=login, password='', email=email,
roles=['myFiveRoleGroup'], text='LDAP user', email_verified=email_verified)
user = user.create()
groups = list()
groups.append('CustomerGroup')
groups.append('CustomerGroup1')
conn = psycopg2.connect(database=current_app.config['DATABASE_NAME'], user=current_app.config['DATABASE_NAME'], host=current_app.config['DATABASE_HOST'], password=current_app.config['DATABASE_PASSWORD'], port=current_app.config['DATABASE_PORT'])
cur = conn.cursor()
postgres_update_query = """UPDATE users SET customer='CustomerGroup', customer1='CustomerGroup1' WHERE login='%s'"""
cur.execute(postgres_update_query % login)
postgres_select_customer = """SELECT customer, customer1 FROM users WHERE login='%s'"""
customerreel = cur.execute(postgres_select_customer % login)
resultat = cur.fetchone()
print(resultat)
conn.commit()
conn.close()
scopes = Permission.lookup(login=login, roles=user.roles)
customers = get_customers(login=login, groups=[user.domain] + groups)
auth_audit_trail.send(current_app._get_current_object(), event='basic-ldap-login', message='user login via LDAP',
user=login, customers=customers, scopes=scopes, resource_id=user.id, type='user',
request=request)
# Generate token
token = create_token(user_id=user.id, name=user.name, login=user.email, provider='ldap', customers=customers,
scopes=scopes, roles=user.roles, email=user.email, email_verified=user.email_verified)
return jsonify(token=token.tokenize)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment