Created
April 9, 2022 00:29
-
-
Save shollingsworth/0e11b139ba4c6ef136be34d6eaf75975 to your computer and use it in GitHub Desktop.
dump nested microsoft group memberships using ldap3 python library and write to files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """Enumerate LDAP nested Group Membership, from list of group names in json file""" | |
| import json | |
| import logging | |
| import os | |
| from pathlib import Path | |
| import ssl | |
| from typing import List | |
| import ldap3 | |
| from ldap3.core.server import Server | |
| logging.basicConfig() | |
| LOG = logging.getLogger("ldap") | |
| LOG.setLevel(logging.INFO) | |
| BASE_DIR = Path(__file__).parent | |
| ERROR_JSON = BASE_DIR.joinpath("error.json") | |
| DATADIR = BASE_DIR / "data" | |
| LDAP_SAVE = DATADIR / "ldap" | |
| LDAP_SAVE.mkdir(exist_ok=True, parents=True) | |
| GRPFILE = DATADIR.joinpath("groups.json") | |
| GRPJSON = json.load(GRPFILE.open()) | |
| LDAP_BASE_DN = os.environ.get("LDAP_BASE_DN", "dc=example,dc=com") | |
| LDAP_USER = os.environ.get("LDAP_USER", "user@domain") | |
| LDAP_PASS = os.environ.get("LDAP_PASS", "password") | |
| LDAP_URI = os.environ.get("LDAP_URI", "ldaps://ldap.example.com") | |
| missing = [ | |
| x | |
| for x in ["LDAP_BASE_DN", "LDAP_USER", "LDAP_PASS", "LDAP_URI"] | |
| if not os.environ.get(x) | |
| ] | |
| if missing: | |
| raise ValueError(f"Missing environment variables: {', '.join(missing)}") | |
| class LDAP: | |
| def __init__(self): | |
| LOG.info("Connecting to LDAP: %s", LDAP_URI) | |
| fqdn = LDAP_URI.split("/")[2] | |
| tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1_2) | |
| server = Server(host=fqdn, use_ssl=True, tls=tls_config, get_info=ldap3.ALL) | |
| self.conn = ldap3.Connection( | |
| server, | |
| user=LDAP_USER, | |
| password=LDAP_PASS, | |
| auto_bind=True, # type: ignore | |
| raise_exceptions=True, | |
| ) | |
| def _get_res(self, query: str, attrs: List[str]): | |
| self.conn.search( | |
| search_base=LDAP_BASE_DN, | |
| search_filter=query, | |
| attributes=attrs, | |
| ) | |
| return self.conn.entries | |
| def group_members(self, name): | |
| search_group_dn = f""" | |
| (& | |
| (objectClass=*) | |
| (cn={name}) | |
| ) | |
| """.strip() | |
| LOG.debug("Using Query:\n%s", search_group_dn) | |
| self.conn.search( | |
| search_base=LDAP_BASE_DN, | |
| search_filter=search_group_dn, | |
| attributes=["distinguishedName"], | |
| ) | |
| entries = self._get_res(search_group_dn, ["distinguishedName"]) | |
| dn = entries[0].distinguishedName | |
| LOG.info("Found DN: %s", dn) | |
| # nested group search | |
| search = f""" | |
| (& | |
| (objectClass=user) | |
| (memberOf:1.2.840.113556.1.4.1941:={dn}) | |
| ) | |
| """.strip() | |
| LOG.debug("Using Query:\n%s", search) | |
| entries = self._get_res(search, ["sAMAccountName"]) | |
| return [str(x.sAMAccountName) for x in entries] | |
| def main(): | |
| """Run main function.""" | |
| obj = LDAP() | |
| errors = [] | |
| try: | |
| for grp in GRPJSON: | |
| try: | |
| grp_members = obj.group_members(grp) | |
| LOG.info("Saving %s, length: %s", grp, len(grp_members)) | |
| LDAP_SAVE.joinpath(f"{grp}.json").write_text(json.dumps(grp_members)) | |
| except Exception as err: # pylint: disable=broad-except | |
| errors.append({grp: str(err)}) | |
| LOG.error("Error getting group members for %s", grp) | |
| except KeyboardInterrupt: | |
| LOG.info("Exiting...") | |
| return | |
| if errors: | |
| with ERROR_JSON.open("w") as f: | |
| json.dump(errors, f) | |
| LOG.error("Errors found, see %s", ERROR_JSON) | |
| else: | |
| LOG.info("No errors found") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment