Created
February 19, 2024 20:58
-
-
Save jarulsamy/d6fc0ed7cfdd3143428f67be6b851ef9 to your computer and use it in GitHub Desktop.
Syncing groups and users from IdM to Keycloak.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
kc-sync - Sync existing Keycloak users group memberships with IdM. | |
This script: | |
1. Creates all groups from IdM in Keycloak | |
2. Queries Keycloak for all users within the realm. | |
3. Finds the corresponding user within IdM. | |
4. Calculates which groups a given user should be added to/removed from. | |
5. Either shows the changes to be made or performs the previously calculated transactions. | |
""" | |
import argparse | |
import logging | |
import sys | |
from pprint import pprint | |
import keycloak | |
import urllib3 | |
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection | |
from python_freeipa import ClientMeta | |
import os | |
from pathlib import Path | |
import configparser | |
logging.basicConfig( | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
) | |
logger = logging.getLogger("kc-sync") | |
def idm_get_user_info(ipa, username): | |
"""Get information about a user from IdM.""" | |
users = ipa.user_find(o_uid=username) | |
if users["count"] > 1: | |
msg = f"Found multiple users with same username: {username}" | |
logger.error(msg) | |
raise ValueError(msg) | |
logger.debug("Found valid user %s within IdM", username) | |
return users["result"][0] | |
def keycloak_create_group(kcadm, group_name): | |
"""Create a group if it does not exist, otherwise ignore.""" | |
try: | |
group = kcadm.create_group({"name": group_name}, skip_exists=True) | |
logger.info("Created group %s in Keycloak", group_name) | |
except keycloak.exceptions.KeycloakPostError: | |
logger.error("Failed to create group %s in Keycloak", group_name) | |
return None | |
return group | |
def keycloak_add_user_to_groups(kcadm, username, user_id, groups): | |
"""Add a user to groups, ignore if already within group.""" | |
for g in groups: | |
# HACK: Assuming all groups are parent groups, this should work for our purpose, | |
# but will likely need to change if we need complicated group hierarchies. | |
g_path = f"/{g}" | |
kc_group = kcadm.get_group_by_path(g_path) | |
group_id = kc_group["id"] | |
kcadm.group_user_add(user_id, group_id) | |
logger.info("Added user %s to group %s", username, kc_group["name"]) | |
def keycloak_remove_user_from_groups(kcadm, username, user_id, groups): | |
"""Remove a user from groups, ignore if already not within group.""" | |
for g in groups: | |
# HACK: Assuming all groups are parent groups, this should work for our purpose, | |
# but will likely need to change if we need complicated group hierarchies. | |
g_path = f"/{g}" | |
kc_group = kcadm.get_group_by_path(g_path) | |
group_id = kc_group["id"] | |
kcadm.group_user_remove(user_id, group_id) | |
logger.info("Removed user %s from group %s", username, kc_group["name"]) | |
def sync_group_from_idm_to_keycloak(kcadm, ipa, dry_run=False): | |
line = "=" * 80 | |
logger.debug("Retrieving IdM groups") | |
idm_groups = [] | |
queries = ( | |
ipa.group_find( | |
o_timelimit=0, | |
o_sizelimit=0, | |
o_posix=True, | |
), | |
ipa.group_find( | |
o_timelimit=0, | |
o_sizelimit=0, | |
o_posix=False, | |
), | |
ipa.group_find( | |
o_timelimit=0, | |
o_sizelimit=0, | |
o_external=True, | |
), | |
) | |
for q in queries: | |
idm_groups += q["result"] | |
# idm_groups = ipa.group_find( | |
# o_timelimit=0, | |
# o_sizelimit=0, | |
# o_posix=True, | |
# ) | |
# idm_groups = idm_groups["result"] | |
# Create any groups that are in IdM but not in keycloak within keycloak. | |
# This makes manual creation of RBAC rules a lot easier. | |
for group in idm_groups: | |
if len(group["cn"]) > 1: | |
logger.warning("Found IdM group with multiple CNs!") | |
group_name = group["cn"][0] | |
if not dry_run: | |
keycloak_create_group(kcadm, group_name) | |
keycloak_users = kcadm.get_users({}) | |
for user in keycloak_users: | |
if user["username"] == "admin": | |
# Skip admin | |
continue | |
idm_user = idm_get_user_info(ipa, user["username"]) | |
idm_user_groups = idm_user["memberof_group"] | |
keycloak_user_id = user["id"] | |
keycloak_user_groups = kcadm.get_user_groups(user_id=keycloak_user_id) | |
keycloak_user_groups = {v["name"]: v for v in keycloak_user_groups} | |
# Set theory is pretty rad. | |
# Compute the set difference in both directions. | |
groups_to_remove_membership = set(keycloak_user_groups.keys()) - set( | |
idm_user_groups | |
) | |
groups_to_add_membership = set(idm_user_groups) - set( | |
keycloak_user_groups.keys() | |
) | |
# Side note, if you are wondering why I'm not just using | |
# kcadm.update_user() to do this, There is a bug in the upstream keycloak | |
# API that doesn't respect the groups field within the payload. Thus we | |
# have to compute these differences on our end and have a seperate API call | |
# for each group membership change. | |
if not dry_run: | |
logger.info("Commiting transactions") | |
keycloak_remove_user_from_groups( | |
kcadm, | |
user["username"], | |
keycloak_user_id, | |
groups_to_remove_membership, | |
) | |
keycloak_add_user_to_groups( | |
kcadm, | |
user["username"], | |
keycloak_user_id, | |
groups_to_add_membership, | |
) | |
continue | |
# Dry run, just print the info | |
logger.info("DRY-RUN, just printing details") | |
print(line) | |
print(f"Keycloak User: {user['username']}") | |
print(f"IdM User: {idm_user['uid'][0]}") | |
print("Keycloak group memberships to remove user from:") | |
for i in groups_to_remove_membership: | |
print(f" {i}") | |
print("Keycloak group memberships to add user to:") | |
for i in groups_to_add_membership: | |
print(f" {i}") | |
print() | |
def build_parser(): | |
"""Build the CLI parser.""" | |
parser = argparse.ArgumentParser( | |
description="IdM to Keycloak Group Sync Tool", | |
) | |
parser.add_argument( | |
"--version", | |
action="version", | |
version="%(prog)s 1.0", | |
) | |
parser.add_argument( | |
"-v", | |
"--verbose", | |
action="count", | |
default=0, | |
help="increase logging verbosity, can be used 3 times", | |
) | |
parser.add_argument( | |
"-n", | |
"--dry-run", | |
action="store_true", | |
help="print which actions would occur, but don't actually modify Keycloak", | |
) | |
return parser | |
def load_config(path=Path("./kc-sync.conf")): | |
"""Load config from a file or env vars. | |
Load program config either from a file or environment variables. | |
Automatically prioritizes options in this order: | |
(highest priority) | |
CLI FLAG | |
ENV VAR | |
CONFIG FILE | |
(lowest priority) | |
""" | |
config = configparser.ConfigParser() | |
if not path.is_file(): | |
config["KEYCLOAK"] = { | |
"server_url": "https://arcccloak.example.com", | |
"username": "username", | |
"password": "password", | |
"realm_name": "test-realm", | |
} | |
config["IPA"] = { | |
"server_url": "idm.server.example.com", | |
"username": "username", | |
"password": "password", | |
} | |
with open(path, "w") as f: | |
config.write(f) | |
print(f"Created default config file at {path.absolute()}") | |
print(f"Please modify it and rerun") | |
sys.exit(0) | |
config.read(path) | |
result = { | |
"KEYCLOAK": { | |
"server_url": os.getenv( | |
"KCS_KEYCLOAK_SERVER_URL", | |
config["KEYCLOAK"]["server_url"], | |
), | |
"username": os.getenv( | |
"KCS_KEYCLOAK_USERNAME", | |
config["KEYCLOAK"]["username"], | |
), | |
"password": os.getenv( | |
"KCS_KEYCLOAK_PASSWORD", | |
config["KEYCLOAK"]["password"], | |
), | |
"realm_name": os.getenv( | |
"KCS_KEYCLOAK_REALM_NAME", | |
config["KEYCLOAK"]["realm_name"], | |
), | |
}, | |
"IPA": { | |
"server_url": os.getenv( | |
"KCS_IPA_SERVER_URL", | |
config["IPA"]["server_url"], | |
), | |
"username": os.getenv( | |
"KCS_IPA_USERNAME", | |
config["IPA"]["username"], | |
), | |
"password": os.getenv( | |
"KCS_IPA_PASSWORD", | |
config["IPA"]["password"], | |
), | |
}, | |
} | |
return result | |
def main(): | |
parser = build_parser() | |
args = parser.parse_args() | |
config = load_config() | |
# Setup logger | |
if args.verbose == 1: | |
logger.setLevel(logging.WARNING) | |
elif args.verbose == 2: | |
logger.setLevel(logging.INFO) | |
elif args.verbose >= 3: | |
logger.setLevel(logging.DEBUG) | |
else: | |
# Disable insecure cert warning. | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
logger.setLevel(logging.ERROR) | |
# Log into keycloak | |
keycloak_connection = KeycloakOpenIDConnection( | |
server_url=config["KEYCLOAK"]["server_url"], | |
username=config["KEYCLOAK"]["username"], | |
password=config["KEYCLOAK"]["password"], | |
user_realm_name="master", | |
realm_name=config["KEYCLOAK"]["realm_name"], | |
verify=True, | |
) | |
kcadm = KeycloakAdmin(connection=keycloak_connection) | |
# Log into IPA | |
ipa = ClientMeta( | |
config["IPA"]["server_url"], | |
verify_ssl=False, | |
) | |
ipa.login(config["IPA"]["username"], config["IPA"]["password"]) | |
sync_group_from_idm_to_keycloak(kcadm, ipa, args.dry_run) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment