Created
October 14, 2019 14:09
-
-
Save jbaker10/4d03616910b86a5f7e24bbc0dab37023 to your computer and use it in GitHub Desktop.
Code to help sync user 's from G Suite to Active Directory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# MIT License | |
# Copyright (c) 2019 Jeremy Baker | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import os | |
import re | |
import logging | |
from ldap3 import Server, ServerPool, Connection, ALL, MODIFY_REPLACE, ALL_ATTRIBUTES, SUBTREE, FIRST | |
from ldap3.core.exceptions import * | |
AD_USERNAME = "" | |
AD_PASSWORD = "" | |
AD_OU = "DN,DOMAIN,COM" | |
DOMAIN_CONTROLLERS = ["DC1.domain.com", "DC2.domain.com"] | |
log = logging.getLogger() | |
def create_username(self, email): | |
""" | |
Helper function that takes an email and returns an AD appropriate username | |
:param email: the email that should be concatenated to a username | |
:return: a username that is the string in front of the `@` symbol without special characters | |
""" | |
try: | |
username = email.split("@")[0] | |
return sub("['-]", "", unidecode(username)) | |
except IndexError as e: | |
log.error("Unable to create username from email {}".format(email)) | |
return "" | |
def generate_password(length=30, complex_password=True): | |
""" | |
Generates a secure password and ensures that the password contains numbers/letters/special symbols (if complex_password=True) | |
:param length: (int) the length of the password to be generated | |
:param complex: (bool) if True will include special chars in password | |
:return: (str) new password | |
""" | |
charset = "ABCDEFGHJKMNPQRSTUVWXYZabcdefghjkmnpqrstuvwxyz123456789" | |
if not complex_password: | |
for i in range(20): | |
password = "".join([secrets.choice(charset) for _ in range(0, length)]) | |
if not search("[a-z]", password): | |
continue | |
elif not search("[0-9]", password): | |
continue | |
elif not search("[A-Z]", password): | |
continue | |
else: | |
return password | |
special_chars = "!@#$%^&*()" | |
for i in range(20): | |
password = "".join([secrets.choice(charset) for _ in range(0, length)]) + "".join( | |
[secrets.choice(special_chars) for _ in range(0, 2)] | |
) | |
if not search("[a-z]", password): | |
continue | |
elif not search("[0-9]", password): | |
continue | |
elif not search("[A-Z]", password): | |
continue | |
elif not search("[!@#$%^&*()]", password): | |
continue | |
else: | |
return password | |
raise Exception("Unable to create secure password after 20 attempts") | |
class LDAP: | |
def __init__(self): | |
pass | |
def bind(self): | |
""" | |
Function that can be called to bind to a list of domain controllers as a pool | |
""" | |
log.info("Trying to connect to domain controllers: {}".format(DOMAIN_CONTROLLERS)) | |
server_pool_list = [] | |
for dc in DOMAIN_CONTROLLERS: | |
server_pool_list.append(Server(host=dc, get_info=ALL, port=636, use_ssl=True, connect_timeout=120)) | |
server_pool = ServerPool(server_pool_list, FIRST, active=True, exhaust=120) | |
try: | |
self.conn = Connection( | |
server_pool, auto_bind=True, user=AD_USERNAME, password=AD_PASSWORD, raise_exceptions=True | |
) | |
self.conn.bind() | |
except LDAPTimeLimitExceededResult as e: | |
raise Exception("Unable to establish a connection with the LDAP server, timed out after 60 seconds") | |
except Exception as e: | |
raise Exception( | |
"Unable to create an LDAP connection to provision users in Active Directory. The error was: {}".format( | |
str(e) | |
) | |
) | |
def list_users(self, ou, attributes=ALL_ATTRIBUTES): | |
""" | |
List all users in a given OU or CN | |
:param ou: The DN path where you want the listing to occur | |
:param attributes: a list of attributes that you would like returned in the query (must contain at least userPrincipalName), defaults to ALL | |
:return: a dictionary of returned objects from the specified search DN with the key set to the user's UPN and value their AD record | |
""" | |
if not attributes == ALL_ATTRIBUTES and not "userPrincipalName" in attributes: | |
raise Exception("The attribute 'userPrincipalName' must be included in the attributes list passed") | |
ad_users = {} | |
try: | |
users = self.conn.extend.standard.paged_search( | |
search_base=ou, | |
search_filter="(objectClass=User)", | |
search_scope=SUBTREE, | |
attributes=attributes, | |
paged_size=100, | |
generator=False, | |
) | |
except Exception as e: | |
raise Exception("Unable to list users in AD. The error was: {}".format(str(e))) | |
if not self.conn.result.get("description") == "success": | |
raise Exception("Unable to list users in AD. The error was: {}".format(self.conn.result)) | |
elif len(users) == 0: | |
log.info("No users were found in Active Directory, but got a good response. Proceeding.") | |
return {} | |
for user in users: | |
try: | |
# we append each user to a dictionary and set the value to the user record | |
# we are primarily using this as a lookup reference to know if the user was already in AD or not | |
ad_users[user.get("attributes", {}).get("userPrincipalName", "")] = user | |
except Exception as e: | |
log.warning("Unable to add user {} to ad_users list. The error was: {}".format(user.get("dn", ""), e)) | |
return ad_users | |
def create_user(self, ldap_user_attributes, ou): | |
""" | |
Create a new user in Active Directory with default attributes | |
(including a generated secure password, and account enablement) | |
:param ou: the path in AD for the user to be created | |
:param ldap_user_attributes: the JSON formatted user record | |
:return: bool status of whether the creation succeeded or not | |
""" | |
try: | |
username = ldap_user_attributes.get("cn", {})[0] # sAMAccountName is a list, we need the first entry | |
except IndexError as e: | |
log.error("Unable to retrieve the sAMAccountName for record {}".format(ldap_user_attributes)) | |
return False | |
log.debug("Attempting to create AD user {}".format(username)) | |
dn = "CN={},{}".format(username, ou) | |
log.debug("The DN will be set to {}".format(dn)) | |
try: | |
resp = self.conn.add(dn, attributes=ldap_user_attributes) | |
except LDAPEntryAlreadyExistsResult as e: | |
log.warning("The user {} already exists in Active Directory, skipping".format(username)) | |
## we return True here since the user does already exist in the domain | |
return True | |
except LDAPConstraintViolationResult as e: | |
log.error("Unable to update account {} due to a Contstraint Violation".format(username)) | |
self.status["errors"].append(str(e)) | |
return False | |
except Exception as e: | |
log.error( | |
"An unexpected error occurred while trying to create user {}. The error was: {}".format( | |
ldap_user_attributes.get("userPrincipalName", [])[0], str(e) | |
) | |
) | |
return False | |
if not resp: | |
log.error( | |
"Unable to create user {}. The error was: {}".format( | |
ldap_user_attributes.get("userPrincipalName", ""), self.conn.result | |
) | |
) | |
return False | |
return True | |
def main(): | |
## create the LDAP object | |
ldap = LDAP() | |
## bind to the domain controllers | |
ldap.bind() | |
# get a list of all current LDAP users (specifically their UPN) in a given OU | |
current_ad_users = ldap.list_users(ou=AD_OU, attributes=["userPrincipalName"]) | |
## This assumes a list of G Suite users to iterate through where each user is a JSON dict representation of the user response from G Suite | |
## For reference, see the G Suite directory API docs here: https://developers.google.com/admin-sdk/directory/v1/reference/users | |
for user in gsuite_users: | |
## since the current_ad_users is a dictionary, we can easy lookup to see if the user already exists in AD and bypass this step | |
## this assumes your G Suite domain and AD domain (used with the UPN are the same) | |
if current_ad_users.get(user.get("primaryEmail", "")): | |
log.warning("User {} already exists in AD, moving on".format(username)) | |
continue | |
ldap_user_attributes = {} | |
## get a username that will be used as the UPN when creating the user in AD | |
username = create_username(user.get("primaryEmail", "")) | |
try: | |
ldap_user_attributes = { | |
"objectClass": ["top", "person", "organizationalPerson", "user"], | |
"cn": [username], | |
"sAMAccountName": [username[:20]], # we need to strip the sAMAccountName to 20 chars | |
"displayName": [username], | |
"mail": [user.get("primaryEmail", "")], | |
## this assumes your G Suite domain and AD domain (used with the UPN are the same) | |
"userPrincipalName": ["{}@{}".format(user.get("primaryEmail", ""))], | |
# mind the extra set of quotes in the uncodePWd value, this is required, do not change | |
"unicodePwd": '"{}"'.format(generate_password()).encode( | |
"utf-16-le" # this is very specific to MS AD, so this needs be the encoding | |
), | |
"userAccountControl": "66048", # userAccountControl mappings: https://vaportech.wordpress.com/2007/12/06/useraccountcontrol/ | |
} | |
# employeeName is normally the 'First Last' name of a user | |
if user.get("employeeName", ""): | |
name_split = user.get("employeeName", "").split(" ") | |
ldap_user_attributes["givenName"] = [name_split[0]] | |
ldap_user_attributes["sn"] = [" ".join(name_split[1:])] | |
if user.get("jobTitle", ""): | |
ldap_user_attributes["title"] = [user.get("jobTitle", "")] | |
if user.get("department", ""): | |
ldap_user_attributes["department"] = [user.get("department", "")] | |
return ldap_user_attributes, True, "" | |
except Exception as e: | |
log.error("Unable to create the LDAP resource for user record {}. The error was: {}".format(user.get("primaryEmail", ""), e)) | |
continue | |
## attempt to create the new user record in AD | |
try: | |
ldap.create_user(ldap_user_attributes=ldap_user_attributes, ou=AD_OU) | |
except Exception as e: | |
try: | |
log.error( | |
"An error occurred while trying to create {}. The error was: {}".format( | |
user.get("userPrincipalName", "")[0], e | |
) | |
) | |
except IndexError as e: | |
log.error("An error occurred while trying to create {}".format(user)) | |
log.info("Successfully created {} in {}".format(username, AD_OU)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment