Last active
May 31, 2024 15:44
-
-
Save darrenjrobinson/34dc0925724426823c79f46397d950b9 to your computer and use it in GitHub Desktop.
Interactive Authentication to Microsoft Graph using MSAL with Python and Delegated Permissions. See associated blogpost https://blog.darrenjrobinson.com/interactive-authentication-to-microsoft-graph-using-msal-with-python-and-delegated-permissions/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import msal | |
import jwt | |
import json | |
import sys | |
import requests | |
from datetime import datetime | |
from msal_extensions import * | |
# Microsoft Azure PowerShell Client ID | |
clientID = '1950a258-227b-4e31-a9cf-717495945fc2' | |
scopes= ["https://management.azure.com/user_impersonation"] | |
username = input('Enter your Azure AD username: ') | |
tenantID = username.split('@')[1] | |
authority = 'https://login.microsoftonline.com/' + tenantID | |
result = None | |
tokenExpiry = None | |
accounts = None | |
myAccount = None | |
def msal_persistence(location, fallback_to_plaintext=False): | |
"""Build a suitable persistence instance based your current OS""" | |
if sys.platform.startswith('win'): | |
return FilePersistenceWithDataProtection(location) | |
if sys.platform.startswith('darwin'): | |
return KeychainPersistence(location, "my_service_name", "my_account_name") | |
return FilePersistence(location) | |
def msal_cache_accounts(clientID, authority): | |
# Accounts | |
persistence = msal_persistence("token_cache.bin") | |
print("Is this MSAL persistence cache encrypted?", persistence.is_encrypted) | |
cache = PersistedTokenCache(persistence) | |
app = msal.PublicClientApplication(client_id=clientID, authority=authority, token_cache=cache) | |
accounts = app.get_accounts() | |
return accounts | |
def msal_delegated_refresh(clientID, scopes, authority, account): | |
persistence = msal_persistence("token_cache.bin") | |
cache = PersistedTokenCache(persistence) | |
app = msal.PublicClientApplication( | |
client_id=clientID, authority=authority, token_cache=cache) | |
result = app.acquire_token_silent_with_error( | |
scopes=scopes, account=account) | |
return result | |
def msal_delegated_refresh_force(clientID, scopes, authority, account): | |
persistence = msal_persistence("token_cache.bin") | |
cache = PersistedTokenCache(persistence) | |
app = msal.PublicClientApplication( | |
client_id=clientID, authority=authority, token_cache=cache) | |
result = app.acquire_token_silent_with_error( | |
scopes=scopes, account=account, force_refresh=True) | |
return result | |
def msal_delegated_interactive_flow(scopes, prompt=None, login_hint=None, domain_hint=None, claims_challenge=None, timeout=None, port=None, extra_scopes_to_consent=None): | |
print("Initate an Interactive Flow (auth via Browser) to get AAD Access and Refresh Tokens.") | |
persistence = msal_persistence("token_cache.bin") | |
cache = PersistedTokenCache(persistence) | |
app = msal.PublicClientApplication(client_id=clientID, authority=authority, token_cache=cache) | |
result = app.acquire_token_interactive(scopes=scopes, prompt=None, login_hint=login_hint, domain_hint=domain_hint ) | |
return result | |
def msal_jwt_expiry(accessToken): | |
decodedAccessToken = jwt.decode(accessToken, verify=False) | |
accessTokenFormatted = json.dumps(decodedAccessToken, indent=2) | |
# Token Expiry | |
tokenExpiry = datetime.fromtimestamp(int(decodedAccessToken['exp'])) | |
print("Token Expires at: " + str(tokenExpiry)) | |
return tokenExpiry | |
def msgraph_request(resource, requestHeaders): | |
# Request | |
results = requests.get(resource, headers=requestHeaders).json() | |
return results | |
accounts = msal_cache_accounts(clientID, authority) | |
if accounts: | |
for account in accounts: | |
if account['username'] == username: | |
myAccount = account | |
print("Found account in MSAL Cache: " + account['username']) | |
print("Attempting to obtain a new Access Token using the Refresh Token") | |
result = msal_delegated_refresh(clientID, scopes, authority, myAccount) | |
if result is None: | |
# Get a new Access Token using the Interactive Flow | |
print("Interactive Authentication required to obtain a new Access Token.") | |
result = msal_delegated_interactive_flow(scopes=scopes, domain_hint=tenantID, login_hint=username) | |
else: | |
# No accounts found in the local MSAL Cache | |
# Trigger interactive authentication flow | |
print("First authentication for " +account['username']) | |
result = msal_delegated_interactive_flow(scopes=scopes, domain_hint=tenantID, login_hint=username) | |
if result: | |
if result["access_token"]: | |
msal_jwt_expiry(result["access_token"]) | |
# Query AAD for tenants the AAD User is Federated too | |
requestHeaders = {'Authorization': 'Bearer ' + result["access_token"],'Content-Type': 'application/json'} | |
queryResults = msgraph_request("https://management.azure.com/tenants?api-version=2020-01-01",requestHeaders) | |
print(json.dumps(queryResults, indent=2)) | |
else: | |
# User not found in MSAL Cache | |
# Trigger interactive authentication flow | |
print("First authentication for " +account['username']) | |
result = msal_delegated_interactive_flow(scopes=scopes, domain_hint=tenantID, login_hint=username) | |
if result["access_token"]: | |
msal_jwt_expiry(result["access_token"]) | |
# Query AAD for tenants the AAD User is Federated too | |
requestHeaders = {'Authorization': 'Bearer ' + result["access_token"],'Content-Type': 'application/json'} | |
queryResults = msgraph_request("https://management.azure.com/tenants?api-version=2020-01-01",requestHeaders) | |
print(json.dumps(queryResults, indent=2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment