Last active
March 14, 2024 02:29
-
-
Save andermoran/83e4f4e4fd1b50bb70a20ca6506217d6 to your computer and use it in GitHub Desktop.
entra_admin_assistant_main
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import json | |
import subprocess | |
import secrets | |
import re | |
import requests | |
import base64 | |
import os | |
from pyperclip import copy | |
# Helpers | |
def decode_base64(encoded_string): | |
# Decode the Base64 string | |
decoded_bytes = base64.b64decode(encoded_string) | |
# Convert bytes to string | |
decoded_string = decoded_bytes.decode('utf-8') | |
return decoded_string | |
def parse_arguments(json_obj): | |
arguments = json_obj.get("arguments", {}) | |
return arguments | |
def extract_value_from_user(output, key): | |
pattern_string = r"{}:\s*(.*?)\n".format(key) | |
pattern = re.compile(pattern_string) | |
match = re.search(pattern, output) | |
if match: | |
upn = match.group(1) | |
if upn != "": | |
return upn | |
else: | |
return None | |
else: | |
return None | |
def generate_password(): | |
return ''.join(secrets.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ?!.+-0123456789') for i in range(16)) | |
def handle_powershell_commands(commands): | |
try: | |
script_block = "& {" + "; ".join(commands) + "}" | |
# print("Executing powershell the following commands:", script_block) | |
result = subprocess.run(["powershell", "-Command", script_block], capture_output=True, text=True, check=True) | |
return result.stdout.strip() | |
except subprocess.CalledProcessError as e: | |
# If an error occurs, print the error message and return None | |
print("Error executing PowerShell command:", e) | |
return None | |
# Utility | |
def get_action(json_obj): | |
return json_obj["action"] | |
def get_domain(json_obj): | |
try: | |
return json_obj["domain"] | |
except KeyError as e: | |
return None | |
def get_tenant_id(domain): | |
if domain == None or domain == "": | |
return None | |
url = f"https://login.microsoftonline.com/{domain}/v2.0/.well-known/openid-configuration" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
data = response.json() | |
return data.get('issuer').split('/')[-2] | |
except requests.RequestException as e: | |
# print("Error:", e) | |
return None | |
def clean_input(input): | |
pattern_ea = r'EA\s' | |
cleaned_string = re.sub(pattern_ea, '', input) | |
pattern_eaabot = r'EAABot\s' | |
cleaned_string = re.sub(pattern_eaabot, '', cleaned_string) | |
pattern_time = r'\[\d.*\]\s' | |
cleaned_string = re.sub(pattern_time, '', cleaned_string) | |
return cleaned_string.strip() | |
def handle_action(action, args, tenant_id, domain): | |
if action == "createUser": | |
return create_user(args, tenant_id, domain) | |
elif action == "resetPassword": | |
return reset_password(args, tenant_id) | |
elif action == "resetMfa": | |
return reset_mfa(args, tenant_id) | |
else: | |
return "Invalid Option" | |
def handle_input(raw_args): | |
cleaned_input = clean_input(raw_args) | |
json_args = None | |
try: | |
json_args = json.loads(cleaned_input) | |
except Exception: | |
try: | |
decoded_input = decode_base64(cleaned_input) | |
json_args = json.loads(decoded_input) | |
except Exception: | |
# in case user pastes from teams and it copies stuff we dont need | |
if "EAABot" in raw_args: | |
os.system('cls') | |
return | |
print("Invalid input: {}".format(raw_args)) | |
print("Input received:", json_args) | |
user_input = input("Type 'go' to proceed with action or 'cancel' to exit: ") | |
while user_input.lower() not in ["go", "cancel"]: | |
user_input = input("Invalid input. Type 'go' to proceed with action or 'cancel' to exit: ") | |
if user_input.lower() == "cancel": | |
sys.exit(0) | |
action = get_action(json_args) | |
domain = get_domain(json_args) | |
tenant_id = get_tenant_id(domain) | |
if tenant_id is not None: | |
print("Please sign in to admin of {} - {}".format(domain, tenant_id)) | |
args = parse_arguments(json_args) | |
message = handle_action(action, args, tenant_id, domain) | |
if type(message) == str: | |
copy(message) | |
print("{} <-- this was copied to your clipboard".format(message)) | |
# Command generators | |
def generate_create_user_commands(args, password, tenant_id, domain): | |
powershell_commands = [ | |
'Connect-AzureAD' | |
] | |
if tenant_id is not None: | |
powershell_commands.append('Connect-AzureAD -TenantId {}'.format(tenant_id)) | |
else: | |
powershell_commands.append('Connect-AzureAD') | |
if 'DisplayName' not in args: | |
raise ValueError("DisplayName is required") | |
if 'MailNickname' not in args: | |
raise ValueError("MailNickname is required") | |
command_string = 'New-AzureADUser' | |
command_string += ' -DisplayName "{}" -MailNickname "{}" -UserPrincipalName "{}@{}"'.format(args['DisplayName'], args['MailNickname'], args['MailNickname'], domain) | |
command_string += ' -PasswordProfile (New-Object -TypeName Microsoft.Open.AzureAD.Model.PasswordProfile -Property @{{Password="{0}"}})'.format(password) | |
#VERIFY WORKING END TO END EVERY FIELD!! | |
if 'UserType' in args: | |
command_string += ' -UserType "{}"'.format(args['UserType']) | |
if 'AccountEnabled' in args: | |
command_string += ' -AccountEnabled ${}'.format("true" if args['AccountEnabled'] else "false") | |
else: | |
command_string += ' -AccountEnabled $true' | |
if 'GivenName' in args: | |
command_string += ' -GivenName "{}"'.format(args['GivenName']) | |
if 'Surname' in args: | |
command_string += ' -Surname "{}"'.format(args['Surname']) | |
if 'UsageLocation' in args: | |
command_string += ' -UsageLocation "{}"'.format(args['UsageLocation']) | |
powershell_commands.append(command_string) | |
powershell_commands.append('$user = Get-AzureADUser -ObjectId "{}@$(Get-AzureADDomain | Select-Object -ExpandProperty Name)"'.format(args['MailNickname'])) | |
powershell_commands.append('Write-Host $user') | |
return powershell_commands | |
def generate_reset_password_commands(args, password, tenant_id): | |
powershell_commands = [ | |
'Import-Module AzureAD', | |
] | |
if tenant_id is not None: | |
powershell_commands.append('Connect-AzureAD -TenantId {}'.format(tenant_id)) | |
else: | |
powershell_commands.append('Connect-AzureAD') | |
if 'Alias' not in args: | |
raise ValueError("Alias is required") | |
powershell_commands.append('$user = Get-AzureADUser -ObjectId "{}@$(Get-AzureADDomain | Select-Object -ExpandProperty Name)"'.format(args['Alias'])) | |
powershell_commands.append('Write-Host $user') | |
powershell_commands.append('$newPassword = ConvertTo-SecureString -String "{}" -AsPlainText -Force'.format(password)) | |
powershell_commands.append('Set-AzureADUserPassword -ObjectId $user.ObjectId -Password $newPassword -ForceChangePasswordNextLogin $true') | |
return powershell_commands | |
def generate_reset_mfa_commands(args, tenant_id): | |
powershell_commands = [ | |
'Import-Module MSOnline', | |
'Import-Module AzureAD', | |
] | |
if tenant_id is not None: | |
powershell_commands.append('Connect-AzureAD -TenantId {}'.format(tenant_id)) | |
else: | |
powershell_commands.append('Connect-AzureAD') | |
powershell_commands.append('Connect-MsolService') | |
if 'Alias' not in args: | |
raise ValueError("Alias is required") | |
powershell_commands.append('$user = Get-AzureADUser -ObjectId "{}@$(Get-AzureADDomain | Select-Object -ExpandProperty Name)"'.format(args['Alias'])) | |
powershell_commands.append('Write-Host $user') | |
powershell_commands.append('Reset-MsolStrongAuthenticationMethodByUpn -UserPrincipalName "{}@$(Get-AzureADDomain | Select-Object -ExpandProperty Name)"'.format(args['Alias'])) | |
return powershell_commands | |
# Actions | |
def create_user(args, tenant_id, domain): | |
password = generate_password() | |
powershell_commands = generate_create_user_commands(args, password, tenant_id, domain) | |
output = handle_powershell_commands(powershell_commands) | |
print(powershell_commands) | |
print(output) | |
upn = extract_value_from_user(output, "UserPrincipalName") | |
if upn != None: | |
print("Created user with email and password:") | |
return "{} - {}".format(upn, password) | |
else: | |
print("Unable to create user. Check if the user already exists") | |
def reset_password(args, tenant_id): | |
password = generate_password() | |
powershell_commands = generate_reset_password_commands(args, password, tenant_id) | |
output = handle_powershell_commands(powershell_commands) | |
upn = extract_value_from_user(output, "UserPrincipalName") | |
forceChangePassword = extract_value_from_user(output, "ForceChangePasswordNextLogin") | |
if upn != None and forceChangePassword == "True": | |
print("Reset password for user. New login credentials are:") | |
return "{} - {}".format(upn, password) | |
else: | |
print("Unable to reset password for user") | |
def reset_mfa(args, tenant_id): | |
powershell_commands = generate_reset_mfa_commands(args, tenant_id) | |
output = handle_powershell_commands(powershell_commands) | |
upn = extract_value_from_user(output, "UserPrincipalName") | |
if upn != None: | |
print("Reset MFA for user: {}".format(upn)) | |
else: | |
print("Unable to reset MFA for user") | |
# Main | |
def run(): | |
# If not JSON passed at command line, prompt for input again. Intentionally, keep asking for inputs | |
while len(sys.argv) < 2: | |
new_input = input("Enter JSON object or base64 string: ") | |
try: | |
handle_input(new_input) | |
except json.JSONDecodeError: | |
try: | |
decoded_input = decode_base64(new_input) | |
handle_input(decoded_input) | |
except Exception: | |
print("Invalid input") | |
print("Regular argument:", arg) | |
for arg in sys.argv[1:]: | |
try: | |
handle_input(arg) | |
except json.JSONDecodeError: | |
try: | |
decoded_input = decode_base64(arg) | |
handle_input(decoded_input) | |
except Exception: | |
print("Invalid input") | |
print("Regular argument:", arg) | |
def main(): | |
while len(sys.argv) < 2: | |
new_input = input("Enter JSON object or base64 string: ") | |
try: | |
handle_input(new_input) | |
except json.JSONDecodeError: | |
try: | |
decoded_input = decode_base64(new_input) | |
handle_input(decoded_input) | |
except Exception: | |
print("Invalid input") | |
print("Regular argument:", arg) | |
for arg in sys.argv[1:]: | |
try: | |
handle_input(arg) | |
except json.JSONDecodeError: | |
try: | |
decoded_input = decode_base64(arg) | |
handle_input(decoded_input) | |
except Exception: | |
print("Invalid input") | |
print("Regular argument:", arg) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment