Created
April 5, 2021 13:22
-
-
Save vskrachkov/2d0fdf7cd62d170af777a570026a1ab1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Iterable, Tuple, Type, Optional, List | |
from django.conf import settings | |
from django.contrib.auth.models import Permission | |
from django.contrib.contenttypes.models import ContentType | |
from django.db.models import Model | |
from keycloak.admin.clientroles import ClientRoles | |
from keycloak.exceptions import KeycloakClientError | |
from keycloak.openid_connect import KeycloakOpenidConnect | |
from keycloak.realm import KeycloakRealm | |
from keycloak.uma import KeycloakUMA | |
def do_sync() -> None: | |
realm = _get_realm() | |
oidc_client = _get_oidc_client(realm) | |
# get access token | |
resp = oidc_client.client_credentials() | |
access_token = resp["access_token"] | |
print(resp) | |
uma_client = _get_uma_client(realm) | |
resources = uma_client.resource_set_list(access_token) | |
for resource_id in resources: | |
print(uma_client.resource_set_read(access_token, resource_id)) | |
# _sync_resources(access_token, uma_client) | |
_sync_roles(realm) | |
def _sync_resources(access_token: str, uma_client: KeycloakUMA) -> None: | |
for ct, permissions in _get_content_types_with_permissions(): | |
resource_name = ct.name | |
try: | |
uma_client.resource_set_create( | |
access_token, | |
resource_name, | |
scopes=permissions, | |
_id=resource_name, | |
ownerManagedAccess=True, | |
) | |
except KeycloakClientError as exc: | |
if exc.original_exc.response.status_code == 409: | |
uma_client.resource_set_update( | |
access_token, | |
id=resource_name, | |
name=resource_name, | |
scopes=permissions, | |
) | |
def _sync_roles(realm: KeycloakRealm) -> None: | |
roles = _get_client_roles(realm) | |
try: | |
for perm in _get_all_permissions(): | |
roles.create(perm.codename, description=perm.name) | |
except KeycloakClientError as exc: | |
print(exc.original_exc) | |
def _get_client_roles(realm: KeycloakRealm) -> ClientRoles: | |
return ( | |
realm.admin.realms.by_name(realm.realm_name) | |
.clients.by_id(settings.KEYCLOAK_CLIENT_ID) | |
.roles | |
) | |
def _get_uma_client(realm: KeycloakRealm) -> KeycloakUMA: | |
return realm.uma() | |
def _get_oidc_client(realm: KeycloakRealm) -> KeycloakOpenidConnect: | |
return realm.open_id_connect( | |
client_id=settings.KEYCLOAK_CLIENT_ID, | |
client_secret=settings.KEYCLOAK_CLIENT_SECRET, | |
) | |
def _get_realm() -> KeycloakRealm: | |
return KeycloakRealm( | |
server_url=settings.KEYCLOAK_URL, | |
realm_name=settings.KEYCLOAK_REALM, | |
) | |
def _get_all_permissions() -> Iterable[Permission]: | |
return Permission.objects.all() | |
def _get_content_types_with_permissions() -> Iterable[Tuple[ContentType, List[str]]]: | |
for ct in ContentType.objects.all(): | |
model_cls: Optional[Type[Model]] = ct.model_class() | |
if model_cls: | |
model_permissions = model_cls._meta.permissions + list( | |
model_cls._meta.default_permissions | |
) | |
yield (ct, model_permissions) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment