Last active
October 4, 2023 16:09
-
-
Save simonLeary42/20c2dc56e9f1f607a16df845c7e49c60 to your computer and use it in GitHub Desktop.
python LDAP user editor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys | |
| import atexit | |
| import ldap3 | |
| __LDAP_USER = "" | |
| __LDAP_PASSWORD = "" | |
| __LOCKED_STR = b"LOCKED" | |
| __USERS = "ou=users,dc=unity,dc=rc,dc=umass,dc=edu" | |
| conn = ldap3.Connection( | |
| ldap3.Server("ldap://identity"), | |
| user=__LDAP_USER, | |
| password=__LDAP_PASSWORD | |
| ) | |
| conn.bind() | |
| atexit.register(conn.unbind) # cleanup after yourself | |
| def __check_ldap_result() -> None: | |
| if conn.result["result"] != 0: | |
| raise RuntimeError(conn.result) | |
| def get_user(username) -> ldap3.abstract.entry.Entry: | |
| """ | |
| fetch LDAP user object | |
| """ | |
| assert isinstance(username, str), "username must be a string" | |
| user_location = f"cn={username},{__USERS}" | |
| conn.search(user_location, "(objectClass=*)", attributes=["*"]) | |
| __check_ldap_result() | |
| assert len(conn.entries) == 1, "there should only be one user with this username" | |
| user = conn.entries[0] | |
| return user | |
| def get_user_attribute(username, attribute_name): | |
| """ | |
| fetch this user's attribute from LDAP | |
| """ | |
| assert isinstance(username, str), "username must be a string" | |
| assert isinstance(attribute_name, str), "attribute_name must be a string" | |
| user = get_user(username) | |
| result = getattr(user, attribute_name).value | |
| return result | |
| def set_user_attribute(username, attribute_name, attribute_value) -> None: | |
| """ | |
| overwrite this user's attribute in LDAP | |
| if the value in LDAP is equal to the new value, print warning message and do nothing | |
| """ | |
| assert isinstance(username, str), "username must be a string" | |
| assert isinstance(attribute_name, str), "attribute_name must be a string" | |
| previous_attribute_value = get_user_attribute(username, attribute_name) | |
| assert(type(previous_attribute_value) == type(attribute_value)), "type must match type in LDAP" | |
| if isinstance(attribute_value, list): | |
| for i in range(len(attribute_value)): | |
| assert(type(previous_attribute_value[i] == type(attribute_value[i]))) | |
| if attribute_value == previous_attribute_value: | |
| print("new value is equal to old value, doing nothing...", file=sys.stderr) | |
| return | |
| user_location = f"cn={username},{__USERS}" | |
| entry_changes = {attribute_name: [(ldap3.MODIFY_REPLACE, attribute_value)]} | |
| conn.modify(user_location, entry_changes) | |
| __check_ldap_result() | |
| def unlock_user_ssh(username) -> None: | |
| """ | |
| undo `lock_user_ssh` | |
| if `lock_user_ssh` hasn't been done to this user, print warning message and do nothing | |
| """ | |
| assert isinstance(username, str), "username must be a string" | |
| pubkeys = get_user_attribute(username, "sshPublicKey") | |
| if not isinstance(pubkeys, list): | |
| new_pubkey = pubkeys[len(__LOCKED_STR):] | |
| set_user_attribute(username, "sshPublicKey", new_pubkey) | |
| return | |
| for i,pubkey in enumerate(pubkeys): | |
| if not pubkey.startswith(__LOCKED_STR): | |
| print(f"skipping key {pubkey}", file=sys.stderr) | |
| continue | |
| pubkeys[i] = pubkey[len(__LOCKED_STR):] # remove first characters from string | |
| set_user_attribute(username, "sshPublicKey", pubkeys) | |
| def lock_user_ssh(username) -> None: | |
| """ | |
| prepend a string to their `sshPublicKey` attribute so that they can't log in | |
| if `lock_user_ssh` has already been done to this user, print warning message and do nothing | |
| """ | |
| assert isinstance(username, str), "username must be a string" | |
| pubkeys = get_user_attribute(username, "sshPublicKey") | |
| if not isinstance(pubkeys, list): | |
| pubkey = pubkeys | |
| if pubkey.startswith(__LOCKED_STR): | |
| print(f"skipping key {pubkey}", file=sys.stderr) | |
| return | |
| new_pubkey = __LOCKED_STR + pubkey | |
| set_user_attribute(username, "sshPublicKey", new_pubkey) | |
| return | |
| for i,pubkey in enumerate(pubkeys): | |
| if pubkey.startswith(__LOCKED_STR): | |
| print(f"skipping key {pubkey}", file=sys.stderr) | |
| continue | |
| pubkeys[i] = __LOCKED_STR + pubkeys[i] | |
| set_user_attribute(username, "sshPublicKey", pubkeys) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment