Created
September 5, 2022 12:36
-
-
Save franznemeth/db3fb9aa317f1cdaa6601f6204a3a2ec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kubernetes import client, config | |
import logging | |
import sys | |
import re | |
logger = logging.getLogger() | |
handler = logging.StreamHandler(sys.stdout) | |
formatter = logging.Formatter('%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
logger.setLevel(logging.INFO) | |
if __name__ == '__main__': | |
config.load_kube_config() | |
api = client.CustomObjectsApi() | |
userattributes = api.list_cluster_custom_object(group="management.cattle.io", | |
version="v3", | |
plural="userattributes", | |
watch=False) | |
# this is stupid: https://github.com/kubernetes-client/python/issues/818#issuecomment-716560405 | |
# explanation as to why the iterator looks insane | |
for userattribute in list(userattributes.items())[1][1]: | |
if not userattribute["ExtraByProvider"]: | |
user = api.get_cluster_custom_object(group="management.cattle.io", | |
version="v3", | |
plural="users", | |
name=userattribute["metadata"]["name"]) | |
principalid = user["principalIds"][0] | |
try: | |
username = re.search('CN=([^,]*)', principalid).group(1).lower().replace(" ", ".") | |
except AttributeError: | |
logger.warning(f"Could not find principalid for user with displayName: {user.get('displayName')} and username: {user.get('username')}") | |
continue | |
logger.info(f"Fixing .extraByProvider for user {userattribute['metadata']['name']} with name: {username}") | |
extra_by_provider_patch = { | |
"ExtraByProvider": { | |
"activedirectory": { | |
"principalid": [principalid], | |
"username": [username] | |
} | |
} | |
} | |
patched_userattribute = api.patch_cluster_custom_object(group="management.cattle.io", | |
version="v3", | |
name=userattribute["metadata"]["name"], | |
plural="userattributes", | |
body=extra_by_provider_patch) | |
logger.info(f"Patched {userattribute['metadata']['name']}") | |
else: | |
logger.debug(f"No action needed for user {userattribute['metadata']['name']}") |
Here my quick version of this script for Azure AD. Instead of using the ldap CN for the username it looks up the users userPrincipalName using the az cli. The az cli was the quick way instead of doing it 100% in python as I do not have a working az python setup but I did have the cli. I also built a list of Rancher users which no longer exist in Azure AD and removed them outside of this script with a kubectl delete user USER once verified it was ok to remove them.
from kubernetes import client, config
import logging
import sys
import re
import subprocess
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
usersNotFound = []
if __name__ == '__main__':
config.load_kube_config()
api = client.CustomObjectsApi()
userattributes = api.list_cluster_custom_object(group="management.cattle.io",
version="v3",
plural="userattributes",
watch=False)
# this is stupid: https://github.com/kubernetes-client/python/issues/818#issuecomment-716560405
# explanation as to why the iterator looks insane
for userattribute in list(userattributes.items())[1][1]:
if not userattribute["ExtraByProvider"]:
user = api.get_cluster_custom_object(group="management.cattle.io",
version="v3",
plural="users",
name=userattribute["metadata"]["name"])
principalid = user["principalIds"][0]
azId = principalid.split('/')[-1]
result = subprocess.run(['az', 'ad', 'user', 'show', '--id', azId, '--query', 'userPrincipalName'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if userattribute.get("username", "") in ["admin"]:
continue
if not result:
usersNotFound.append(userattribute['metadata']['name'])
logger.warning(f"Could not lookup azure id for principalid for user with displayName: {user.get('displayName')} and azid : {azId}")
continue
try:
output = result.stdout.decode('utf-8')
username = output.split('"')[1]
except:
usersNotFound.append(userattribute['metadata']['name'])
logger.warning(f"Could not find principalid for user with displayName: {user.get('displayName')} and azid : {azId}")
continue
logger.info(f"Fixing .extraByProvider for user {userattribute['metadata']['name']} with name: {username}")
extra_by_provider_patch = {
"ExtraByProvider": {
"azuread": {
"principalid": [principalid],
"username": [username]
}
}
}
patched_userattribute = api.patch_cluster_custom_object(group="management.cattle.io",
version="v3",
name=userattribute["metadata"]["name"],
plural="userattributes",
body=extra_by_provider_patch)
logger.info(f"Patched {userattribute['metadata']['name']}")
else:
logger.debug(f"No action needed for user {userattribute['metadata']['name']}")
# print out users which could not be found
print("Users which no longer exist in Azure AD")
for user in usersNotFound:
print(user)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
oneliner for keycloak: