Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save shollingsworth/0e11b139ba4c6ef136be34d6eaf75975 to your computer and use it in GitHub Desktop.
Save shollingsworth/0e11b139ba4c6ef136be34d6eaf75975 to your computer and use it in GitHub Desktop.
dump nested microsoft group memberships using ldap3 python library and write to files
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Enumerate LDAP nested Group Membership, from list of group names in json file"""
import json
import logging
import os
from pathlib import Path
import ssl
from typing import List
import ldap3
from ldap3.core.server import Server
logging.basicConfig()
LOG = logging.getLogger("ldap")
LOG.setLevel(logging.INFO)
BASE_DIR = Path(__file__).parent
ERROR_JSON = BASE_DIR.joinpath("error.json")
DATADIR = BASE_DIR / "data"
LDAP_SAVE = DATADIR / "ldap"
LDAP_SAVE.mkdir(exist_ok=True, parents=True)
GRPFILE = DATADIR.joinpath("groups.json")
GRPJSON = json.load(GRPFILE.open())
LDAP_BASE_DN = os.environ.get("LDAP_BASE_DN", "dc=example,dc=com")
LDAP_USER = os.environ.get("LDAP_USER", "user@domain")
LDAP_PASS = os.environ.get("LDAP_PASS", "password")
LDAP_URI = os.environ.get("LDAP_URI", "ldaps://ldap.example.com")
missing = [
x
for x in ["LDAP_BASE_DN", "LDAP_USER", "LDAP_PASS", "LDAP_URI"]
if not os.environ.get(x)
]
if missing:
raise ValueError(f"Missing environment variables: {', '.join(missing)}")
class LDAP:
def __init__(self):
LOG.info("Connecting to LDAP: %s", LDAP_URI)
fqdn = LDAP_URI.split("/")[2]
tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1_2)
server = Server(host=fqdn, use_ssl=True, tls=tls_config, get_info=ldap3.ALL)
self.conn = ldap3.Connection(
server,
user=LDAP_USER,
password=LDAP_PASS,
auto_bind=True, # type: ignore
raise_exceptions=True,
)
def _get_res(self, query: str, attrs: List[str]):
self.conn.search(
search_base=LDAP_BASE_DN,
search_filter=query,
attributes=attrs,
)
return self.conn.entries
def group_members(self, name):
search_group_dn = f"""
(&
(objectClass=*)
(cn={name})
)
""".strip()
LOG.debug("Using Query:\n%s", search_group_dn)
self.conn.search(
search_base=LDAP_BASE_DN,
search_filter=search_group_dn,
attributes=["distinguishedName"],
)
entries = self._get_res(search_group_dn, ["distinguishedName"])
dn = entries[0].distinguishedName
LOG.info("Found DN: %s", dn)
# nested group search
search = f"""
(&
(objectClass=user)
(memberOf:1.2.840.113556.1.4.1941:={dn})
)
""".strip()
LOG.debug("Using Query:\n%s", search)
entries = self._get_res(search, ["sAMAccountName"])
return [str(x.sAMAccountName) for x in entries]
def main():
"""Run main function."""
obj = LDAP()
errors = []
try:
for grp in GRPJSON:
try:
grp_members = obj.group_members(grp)
LOG.info("Saving %s, length: %s", grp, len(grp_members))
LDAP_SAVE.joinpath(f"{grp}.json").write_text(json.dumps(grp_members))
except Exception as err: # pylint: disable=broad-except
errors.append({grp: str(err)})
LOG.error("Error getting group members for %s", grp)
except KeyboardInterrupt:
LOG.info("Exiting...")
return
if errors:
with ERROR_JSON.open("w") as f:
json.dump(errors, f)
LOG.error("Errors found, see %s", ERROR_JSON)
else:
LOG.info("No errors found")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment