Last active
June 23, 2018 18:51
-
-
Save dsoprea/3ae3f2b085dc0cb30294e4dd6eb308df to your computer and use it in GitHub Desktop.
LDAP Lookups Against Active Directory with Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import ldap | |
import collections | |
# Install: | |
# | |
# apt: libsasl2-dev | |
# pip: python-ldap | |
# | |
_USER_QUERY = '(&(objectClass=USER)(sAMAccountName={username}))' | |
_GROUP_QUERY = '(&(objectClass=GROUP)(cn={group_name}))' | |
_LOGGER = logging.getLogger(__name__) | |
_USER = \ | |
collections.namedtuple( | |
'_USER', [ | |
'dn', | |
'attributes', | |
'groups', | |
]) | |
class NotFoundException(Exception): | |
pass | |
class LdapAdapter(object): | |
def __init__(self, host_and_port, username, password, base_dn): | |
self.__host_and_port = host_and_port | |
self.__username = username | |
self.__password = password | |
self.__base_dn = base_dn | |
self.__raw_resource = None | |
@property | |
def __resource(self): | |
if self.__raw_resource is None: | |
self.__raw_resource = self.__auth() | |
return self.__raw_resource | |
def __auth(self): | |
conn = ldap.initialize('ldap://' + self.__host_and_port) | |
conn.protocol_version = 3 | |
conn.set_option(ldap.OPT_REFERRALS, 0) | |
try: | |
conn.simple_bind_s(self.__username, self.__password) | |
except ldap.INVALID_CREDENTIALS: | |
# Pinned, for the future. | |
raise | |
except ldap.SERVER_DOWN: | |
# Pinned, for the future. | |
raise | |
except ldap.LDAPError: | |
_LOGGER.error("LDAP error content:\n{}".format(e.message)) | |
if issubclass(e.message.__class__, dict) is True and \ | |
'desc' in e.message: | |
raise Exception("LDAP: {}".format(e.message['desc'])) | |
raise | |
else: | |
return conn | |
def get_dn_by_username(self, username): | |
"""Return user information. See _USER.""" | |
uf = _USER_QUERY.format(username=username) | |
results_raw = \ | |
self.__resource.search_s( | |
self.__base_dn, | |
ldap.SCOPE_SUBTREE, | |
uf) | |
if not results_raw: | |
raise NotFoundException(username) | |
results = [] | |
for dn, attributes in results_raw: | |
if dn is None: | |
continue | |
u = _USER( | |
dn=dn, | |
attributes=attributes, | |
groups=attributes['memberOf']) | |
results.append(u) | |
assert \ | |
len(results) == 1, \ | |
"More than one result was found for user [{}], which doesn't " \ | |
"make sense: {}".format(username, results) | |
return results[0] | |
def get_group_members(self, group_name): | |
"""Return a list of DNs.""" | |
gf = _GROUP_QUERY.format(group_name=group_name) | |
results_raw = \ | |
self.__resource.search_s( | |
self.__base_dn, | |
ldap.SCOPE_SUBTREE, | |
gf) | |
if not results_raw: | |
raise NotFoundException(group_name) | |
collections = [] | |
for dn, attributes in results_raw: | |
if dn is None: | |
continue | |
collections.append(attributes['member']) | |
if not collections: | |
raise NotFoundException(group_name) | |
assert \ | |
len(collections) == 1, \ | |
"Too many sets of results returned: {}".format(collections) | |
return collections[0] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment