Created
June 16, 2025 11:15
-
-
Save stewartshea/d36c6f1dff2286bb1fcd34a22943587f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Azure Resource Dependency Graph Generator | |
This script generates a visual dependency graph of Azure resources in a subscription | |
using the Azure Resource Graph service. It can output either a Mermaid diagram or | |
an interactive HTML page with D3.js visualization. | |
""" | |
import argparse | |
import json | |
import subprocess | |
import sys | |
import os | |
import time | |
from typing import Dict, List, Set, Tuple, Optional, Any, Callable, TypeVar | |
from functools import wraps | |
# Import the visualization modules | |
sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
# Global list to collect non-fatal errors and warnings | |
_failed_operations_log: List[str] = [] | |
# Type variable for decorator | |
F = TypeVar('F', bound=Callable[..., Any]) | |
def log_failure(message: str): | |
"""Logs a failure message that allows the script to continue.""" | |
global _failed_operations_log | |
_failed_operations_log.append(message) | |
print(f"Warning: {message}", file=sys.stderr) | |
def truncate_subscription_id(subscription_id: str) -> str: | |
""" | |
Truncate subscription ID to first 10 characters plus ellipsis. | |
Args: | |
subscription_id: The full Azure subscription ID | |
Returns: | |
Truncated subscription ID | |
""" | |
return subscription_id[:10] + "..." | |
def retry_with_backoff(max_retries: int = 3, initial_delay: float = 1.0, backoff_factor: float = 2.0) -> Callable[[F], F]: | |
""" | |
A decorator to retry a function with exponential backoff. | |
Args: | |
max_retries: Maximum number of times to retry the function. | |
initial_delay: Initial delay in seconds before the first retry. | |
backoff_factor: Factor by which the delay increases for each retry. | |
""" | |
def decorator(func: F) -> F: | |
@wraps(func) | |
def wrapper(*args: Any, **kwargs: Any) -> Any: | |
delay = initial_delay | |
for i in range(max_retries + 1): | |
try: | |
return func(*args, **kwargs) | |
except subprocess.CalledProcessError as e: | |
# Check for throttling or other retryable errors | |
stderr_output = e.stderr.decode() if e.stderr else "" | |
if "RateLimiting" in stderr_output or "TooManyRequests" in stderr_output or "Throttled" in stderr_output: | |
log_failure(f"Attempt {i+1}/{max_retries+1}: Command failed due to throttling. Retrying in {delay:.2f}s. Command: {args[0] if args else 'N/A'}") | |
if i < max_retries: | |
time.sleep(delay) | |
delay *= backoff_factor | |
else: | |
log_failure(f"Max retries reached for command: {args[0] if args else 'N/A'}. Giving up.") | |
raise # Re-raise the last exception if max retries reached | |
else: | |
# For non-retryable errors, re-raise immediately | |
raise | |
except Exception as e: | |
log_failure(f"Attempt {i+1}/{max_retries+1}: An unexpected error occurred. Retrying in {delay:.2f}s. Error: {e}") | |
if i < max_retries: | |
time.sleep(delay) | |
delay *= backoff_factor | |
else: | |
log_failure(f"Max retries reached for unexpected error. Giving up.") | |
raise | |
return None # Should not be reached | |
return wrapper | |
return decorator | |
@retry_with_backoff() | |
def run_az_command(command: str) -> Optional[Any]: | |
""" | |
Run an Azure CLI command and return its output (JSON parsed or raw string). | |
Args: | |
command: The Azure CLI command to run | |
Returns: | |
The parsed JSON output (dict/list), raw string output, or None if an error occurs. | |
""" | |
try: | |
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
if result.stdout.strip(): | |
try: | |
return json.loads(result.stdout) | |
except json.JSONDecodeError: | |
# If not valid JSON, return the raw string output | |
return result.stdout.strip() | |
return None # Command ran successfully but returned no output | |
except subprocess.CalledProcessError as e: | |
# This will be caught by the decorator's exception handling | |
raise | |
except Exception as e: # Catch any other unexpected errors | |
# This will be caught by the decorator's exception handling | |
raise | |
@retry_with_backoff() | |
def run_az_command_list(command: List[str], subscription_id: Optional[str] = None) -> Dict[str, Any]: | |
""" | |
Run an Azure CLI command from a list and return the JSON output. | |
Args: | |
command: The Azure CLI command as a list | |
subscription_id: Optional subscription ID | |
Returns: | |
The parsed JSON output of the command | |
""" | |
full_command = list(command) # Create a copy to avoid modifying the original list | |
try: | |
if subscription_id: | |
full_command.extend(['--subscription', subscription_id]) | |
result = subprocess.run( | |
full_command, | |
capture_output=True, | |
text=True, | |
check=True | |
) | |
if result.stdout.strip(): | |
return json.loads(result.stdout) | |
return {} | |
except subprocess.CalledProcessError as e: | |
# This will be caught by the decorator's exception handling | |
raise | |
except json.JSONDecodeError as e: | |
log_failure(f"Error parsing JSON response from command {' '.join(full_command)}: {e}") | |
return {} | |
except Exception as e: | |
# This will be caught by the decorator's exception handling | |
raise | |
def check_az_cli_installed() -> bool: | |
"""Check if Azure CLI is installed.""" | |
try: | |
subprocess.run(["az", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) | |
return True | |
except (subprocess.CalledProcessError, FileNotFoundError): | |
return False | |
def check_az_cli_logged_in() -> bool: | |
"""Check if the user is logged in to Azure CLI.""" | |
try: | |
subprocess.run(["az", "account", "show"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) | |
return True | |
except subprocess.CalledProcessError: | |
return False | |
def get_subscription_id() -> Optional[str]: | |
"""Get the current Azure subscription ID.""" | |
try: | |
result = run_az_command("az account show --query id -o json") | |
if result is None: | |
log_failure("Failed to retrieve subscription ID.") | |
return None | |
if isinstance(result, str): | |
return result.strip('"') | |
elif isinstance(result, dict) and 'id' in result: | |
return result['id'] | |
log_failure(f"Unexpected format for subscription ID: {result}") | |
return None | |
except Exception as e: | |
log_failure(f"Error in get_subscription_id: {e}") | |
return None | |
def list_resource_groups(subscription_id: str) -> List[Dict[str, Any]]: | |
""" | |
List all resource groups in the subscription. | |
Args: | |
subscription_id: The Azure subscription ID | |
Returns: | |
A list of resource group objects | |
""" | |
query = f"ResourceContainers | where type == 'microsoft.resources/subscriptions/resourcegroups' | where subscriptionId == '{subscription_id}' | project name, id" | |
try: | |
result = run_az_command(f"az graph query -q \"{query}\" --subscription {subscription_id}") | |
if result is None: | |
log_failure(f"Failed to retrieve resource groups for subscription {subscription_id}.") | |
return [] | |
if isinstance(result, dict): | |
return result.get('data', []) | |
log_failure(f"Unexpected format for resource groups: {result}") | |
return [] | |
except Exception as e: | |
log_failure(f"Error in list_resource_groups for subscription {subscription_id}: {e}") | |
return [] | |
def get_network_information(resource: Dict[str, Any], subscription_id: str) -> Dict[str, Any]: | |
"""Get network information for addressable resources""" | |
resource_type = resource.get('type', '').lower() | |
resource_group = resource.get('resourceGroup', '') | |
name = resource.get('name', '') | |
network_info = {} | |
if not resource_group or not name: | |
log_failure(f"Skipping network info for resource {name} due to missing resourceGroup or name.") | |
return network_info | |
try: | |
# Virtual Machines | |
if 'microsoft.compute/virtualmachines' in resource_type: | |
vm_details = run_az_command_list([ | |
'az', 'vm', 'show', | |
'--resource-group', resource_group, | |
'--name', name, | |
'--show-details' | |
], subscription_id) | |
if vm_details: | |
network_info.update({ | |
'privateIps': vm_details.get('privateIps', []), | |
'publicIps': vm_details.get('publicIps', []), | |
'fqdns': vm_details.get('fqdns', []) | |
}) | |
# Public IP Addresses | |
elif 'microsoft.network/publicipaddresses' in resource_type: | |
pip_details = run_az_command_list([ | |
'az', 'network', 'public-ip', 'show', | |
'--resource-group', resource_group, | |
'--name', name | |
], subscription_id) | |
if pip_details: | |
network_info.update({ | |
'ipAddress': pip_details.get('ipAddress'), | |
'fqdn': pip_details.get('dnsSettings', {}).get('fqdn'), | |
'allocationMethod': pip_details.get('publicIPAllocationMethod') | |
}) | |
# Load Balancers | |
elif 'microsoft.network/loadbalancers' in resource_type: | |
lb_details = run_az_command_list([ | |
'az', 'network', 'lb', 'show', | |
'--resource-group', resource_group, | |
'--name', name | |
], subscription_id) | |
if lb_details: | |
frontend_ips = [] | |
frontend_configs = lb_details.get('frontendIPConfigurations', []) | |
if isinstance(frontend_configs, list): | |
for frontend in frontend_configs: | |
public_ip_address = frontend.get('publicIPAddress') | |
if public_ip_address and isinstance(public_ip_address, dict): | |
frontend_ips.append(public_ip_address.get('id', '').split('/')[-1]) | |
else: | |
log_failure(f"Unexpected format for 'frontendIPConfigurations' in LB {name}. Expected list, got {type(frontend_configs)}.") | |
network_info['frontendIPs'] = frontend_ips | |
# App Services | |
elif 'microsoft.web/sites' in resource_type: | |
app_details = run_az_command_list([ | |
'az', 'webapp', 'show', | |
'--resource-group', resource_group, | |
'--name', name | |
], subscription_id) | |
if app_details: | |
network_info.update({ | |
'defaultHostName': app_details.get('defaultHostName'), | |
'hostNames': app_details.get('hostNames', []), | |
'outboundIpAddresses': app_details.get('outboundIpAddresses', '').split(',') if app_details.get('outboundIpAddresses') else [] | |
}) | |
# Storage Accounts | |
elif 'microsoft.storage/storageaccounts' in resource_type: | |
storage_details = run_az_command_list([ | |
'az', 'storage', 'account', 'show', | |
'--resource-group', resource_group, | |
'--name', name | |
], subscription_id) | |
if storage_details: | |
primary_endpoints = storage_details.get('primaryEndpoints', {}) | |
network_info['endpoints'] = primary_endpoints | |
except Exception as e: | |
log_failure(f"Error getting network info for {name} ({resource_type}): {str(e)}") | |
return network_info | |
def get_environment_variables(resource: Dict[str, Any], subscription_id: str) -> Dict[str, Any]: | |
"""Get environment variables for supported resources""" | |
resource_type = resource.get('type', '').lower() | |
resource_group = resource.get('resourceGroup', '') | |
name = resource.get('name', '') | |
env_vars = {} | |
if not resource_group or not name: | |
log_failure(f"Skipping environment variables for resource {name} due to missing resourceGroup or name.") | |
return env_vars | |
try: | |
# App Service / Web Apps | |
if 'microsoft.web/sites' in resource_type: | |
app_settings = run_az_command_list([ | |
'az', 'webapp', 'config', 'appsettings', 'list', | |
'--resource-group', resource_group, | |
'--name', name | |
], subscription_id) | |
if isinstance(app_settings, list): | |
env_vars['appSettings'] = {setting.get('name'): setting.get('value') for setting in app_settings if isinstance(setting, dict) and setting.get('name')} | |
# Get connection strings | |
conn_strings = run_az_command_list([ | |
'az', 'webapp', 'config', 'connection-string', 'list', | |
'--resource-group', resource_group, | |
'--name', name | |
], subscription_id) | |
if isinstance(conn_strings, list): | |
env_vars['connectionStrings'] = {cs.get('name'): cs.get('value') for cs in conn_strings if isinstance(cs, dict) and cs.get('name')} | |
# Function Apps | |
elif 'microsoft.web/sites' in resource_type and resource.get('kind', '').lower() == 'functionapp': | |
func_settings = run_az_command_list([ | |
'az', 'functionapp', 'config', 'appsettings', 'list', | |
'--resource-group', resource_group, | |
'--name', name | |
], subscription_id) | |
if isinstance(func_settings, list): | |
env_vars['functionAppSettings'] = {setting.get('name'): setting.get('value') for setting in func_settings if isinstance(setting, dict) and setting.get('name')} | |
# Container Instances | |
elif 'microsoft.containerinstance/containergroups' in resource_type: | |
container_details = run_az_command_list([ | |
'az', 'container', 'show', | |
'--resource-group', resource_group, | |
'--name', name | |
], subscription_id) | |
if container_details: | |
containers = container_details.get('containers', []) | |
if isinstance(containers, list): | |
for i, container in enumerate(containers): | |
if isinstance(container, dict) and container.get('environmentVariables'): | |
env_vars[f'container_{i}_env'] = { | |
env.get('name'): env.get('value', env.get('secureValue', 'SECURE_VALUE')) | |
for env in container['environmentVariables'] if isinstance(env, dict) and env.get('name') | |
} | |
else: | |
log_failure(f"Unexpected format for 'containers' in Container Group {name}. Expected list, got {type(containers)}.") | |
except Exception as e: | |
log_failure(f"Error getting environment variables for {name} ({resource_type}): {str(e)}") | |
return env_vars | |
def get_resource_specific_config(resource: Dict[str, Any], subscription_id: str) -> Dict[str, Any]: | |
"""Get resource-specific configuration details""" | |
resource_type = resource.get('type', '').lower() | |
resource_group = resource.get('resourceGroup', '') | |
name = resource.get('name', '') | |
config = {} | |
if not resource_group or not name: | |
log_failure(f"Skipping specific config for resource {name} due to missing resourceGroup or name.") | |
return config | |
try: | |
# Virtual Machines | |
if 'microsoft.compute/virtualmachines' in resource_type: | |
vm_details = run_az_command_list([ | |
'az', 'vm', 'show', | |
'--resource-group', resource_group, | |
'--name', name | |
], subscription_id) | |
if vm_details: | |
config.update({ | |
'vmSize': vm_details.get('hardwareProfile', {}).get('vmSize'), | |
'osType': vm_details.get('storageProfile', {}).get('osDisk', {}).get('osType'), | |
'imageReference': vm_details.get('storageProfile', {}).get('imageReference'), | |
'adminUsername': vm_details.get('osProfile', {}).get('adminUsername') | |
}) | |
# Storage Accounts | |
elif 'microsoft.storage/storageaccounts' in resource_type: | |
storage_details = run_az_command_list([ | |
'az', 'storage', 'account', 'show', | |
'--resource-group', resource_group, | |
'--name', name | |
], subscription_id) | |
if storage_details: | |
config.update({ | |
'sku': storage_details.get('sku'), | |
'accessTier': storage_details.get('accessTier'), | |
'encryption': storage_details.get('encryption'), | |
'networkRuleSet': storage_details.get('networkRuleSet') | |
}) | |
# SQL Databases | |
elif 'microsoft.sql/servers/databases' in resource_type: | |
# Extract server name from resource ID | |
resource_id_parts = resource.get('id', '').split('/') | |
server_name = '' | |
if len(resource_id_parts) > 8: | |
server_name = resource_id_parts[8] | |
if server_name: | |
db_details = run_az_command_list([ | |
'az', 'sql', 'db', 'show', | |
'--resource-group', resource_group, | |
'--server', server_name, | |
'--name', name | |
], subscription_id) | |
if db_details: | |
config.update({ | |
'edition': db_details.get('edition'), | |
'serviceLevelObjective': db_details.get('serviceLevelObjective'), | |
'maxSizeBytes': db_details.get('maxSizeBytes'), | |
'collation': db_details.get('collation') | |
}) | |
else: | |
log_failure(f"Could not extract server name from resource ID for SQL Database {name}.") | |
# Key Vaults | |
elif 'microsoft.keyvault/vaults' in resource_type: | |
kv_details = run_az_command_list([ | |
'az', 'keyvault', 'show', | |
'--resource-group', resource_group, | |
'--name', name | |
], subscription_id) | |
if kv_details: | |
properties_dict = kv_details.get('properties', {}) | |
access_policies = properties_dict.get('accessPolicies', []) | |
if not isinstance(access_policies, list): | |
log_failure(f"Unexpected format for 'accessPolicies' in Key Vault {name}. Expected list, got {type(access_policies)}.") | |
access_policies = [] | |
config.update({ | |
'sku': properties_dict.get('sku'), | |
'accessPolicies': access_policies, | |
'enabledForDeployment': properties_dict.get('enabledForDeployment'), | |
'enabledForTemplateDeployment': properties_dict.get('enabledForTemplateDeployment') | |
}) | |
except Exception as e: | |
log_failure(f"Error getting specific config for {name} ({resource_type}): {str(e)}") | |
return config | |
def list_resources_basic(subscription_id: str) -> List[Dict[str, Any]]: | |
""" | |
List all resources in the subscription with basic information. | |
Args: | |
subscription_id: The Azure subscription ID | |
Returns: | |
A list of basic resource objects | |
""" | |
query = f"Resources | where subscriptionId == '{subscription_id}' | project id, name, type, resourceGroup, kind, location, tags, properties" | |
try: | |
result = run_az_command(f"az graph query -q \"{query}\" --subscription {subscription_id}") | |
if result is None: | |
log_failure(f"Failed to retrieve basic resources for subscription {subscription_id}.") | |
return [] | |
if isinstance(result, dict): | |
return result.get('data', []) | |
log_failure(f"Unexpected format for basic resources: {result}") | |
return [] | |
except Exception as e: | |
log_failure(f"Error in list_resources_basic for subscription {subscription_id}: {e}") | |
return [] | |
def list_resources(subscription_id: str, basic_mode: bool = False) -> List[Dict[str, Any]]: | |
""" | |
List all resources in the subscription with enhanced details. | |
Args: | |
subscription_id: The Azure subscription ID | |
basic_mode: If True, only gather basic resource information | |
Returns: | |
A list of resource objects with detailed information | |
""" | |
if basic_mode: | |
print("Using basic mode - gathering resource information without detailed enhancement...") | |
return list_resources_basic(subscription_id) | |
print("Using enhanced mode - gathering detailed resource information...") | |
print("Fetching basic resource information...") | |
query = f"Resources | where subscriptionId == '{subscription_id}' | project id, name, type, resourceGroup, kind, location, tags" | |
basic_resources = [] | |
try: | |
result = run_az_command(f"az graph query -q \"{query}\" --subscription {subscription_id}") | |
if result is None: | |
log_failure(f"Failed to retrieve basic resources for subscription {subscription_id}.") | |
elif isinstance(result, dict): | |
basic_resources = result.get('data', []) | |
else: | |
log_failure(f"Unexpected format for basic resources: {result}") | |
except Exception as e: | |
log_failure(f"Error fetching basic resource information for subscription {subscription_id}: {e}") | |
enhanced_resources = [] | |
total_resources = len(basic_resources) | |
print(f"Enhancing resource details for {total_resources} resources...") | |
for i, resource in enumerate(basic_resources, 1): | |
resource_name = resource.get('name', 'Unknown') | |
resource_id = resource.get('id') | |
resource_type = resource.get('type') | |
print(f"Processing resource {i}/{total_resources}: {resource_name}") | |
if not resource_id: | |
log_failure(f"Resource {resource_name} missing 'id', skipping detailed enhancement.") | |
enhanced_resources.append(resource) | |
continue | |
if not resource_type: | |
log_failure(f"Resource {resource_name} ({resource_id}) missing 'type', skipping detailed enhancement.") | |
enhanced_resources.append(resource) | |
continue | |
try: | |
# Get enhanced resource details | |
enhanced_resource = resource.copy() | |
# Add network information | |
network_info = get_network_information(resource, subscription_id) | |
if network_info: | |
enhanced_resource['networkInfo'] = network_info | |
# Add environment variables | |
env_vars = get_environment_variables(resource, subscription_id) | |
if env_vars: | |
enhanced_resource['environmentVariables'] = env_vars | |
# Add resource-specific configurations | |
specific_config = get_resource_specific_config(resource, subscription_id) | |
if specific_config: | |
enhanced_resource['specificConfiguration'] = specific_config | |
# Get properties from Resource Graph for dependency analysis | |
properties_query = f"Resources | where id == '{resource_id}' | project properties" | |
try: | |
properties_result = run_az_command(f"az graph query -q \"{properties_query}\" --subscription {subscription_id}") | |
if properties_result is not None and isinstance(properties_result, dict) and properties_result.get('data'): | |
if properties_result['data'] and isinstance(properties_result['data'][0], dict): | |
enhanced_resource['properties'] = properties_result['data'][0].get('properties', {}) | |
else: | |
log_failure(f"Properties query for {resource_id} returned unexpected data format: {properties_result}") | |
elif properties_result is not None: | |
log_failure(f"Properties query for {resource_id} returned no data or unexpected type: {properties_result}") | |
else: | |
log_failure(f"Failed to retrieve properties for {resource_id}.") | |
except Exception as e: | |
log_failure(f"Error retrieving properties for {resource_id}: {e}") | |
enhanced_resources.append(enhanced_resource) | |
except Exception as e: | |
log_failure(f"Error enhancing resource {resource_name} ({resource_id}): {str(e)}") | |
# Add basic resource info if enhancement fails | |
enhanced_resources.append(resource) | |
print(f"Enhanced {len(enhanced_resources)} resources with detailed information") | |
return enhanced_resources | |
def get_resource_dependencies(subscription_id: str, resources: List[Dict[str, Any]]) -> Tuple[Dict[str, Set[str]], Dict[str, Set[str]]]: | |
""" | |
Identify dependencies between resources using enhanced resource information. | |
Args: | |
subscription_id: The Azure subscription ID | |
resources: List of enhanced resources to analyze | |
Returns: | |
A tuple containing (confirmed_dependencies, potential_dependencies) dictionaries mapping resource IDs to sets of dependent resource IDs | |
""" | |
confirmed_dependencies: Dict[str, Set[str]] = {} | |
potential_dependencies: Dict[str, Set[str]] = {} | |
# Initialize empty dependency sets for all resources | |
for resource in resources: | |
resource_id = resource.get('id') | |
if resource_id: | |
confirmed_dependencies[resource_id] = set() | |
potential_dependencies[resource_id] = set() | |
else: | |
log_failure(f"Resource missing 'id', cannot initialize for dependency tracking: {resource.get('name', 'Unknown')}") | |
print("Analyzing resource dependencies with enhanced information...") | |
# Check for different types of dependencies | |
for resource in resources: | |
resource_id = resource.get('id') | |
resource_type = resource.get('type') | |
if not resource_id: | |
log_failure(f"Resource missing 'id', skipping dependency analysis for {resource.get('name', 'Unknown')}.") | |
continue | |
if not resource_type: | |
log_failure(f"Resource {resource_id} missing 'type', skipping dependency analysis.") | |
continue | |
# Use enhanced properties if available, otherwise use basic properties | |
properties = resource.get('properties', {}) | |
# Enhanced dependency detection using environment variables | |
env_vars = resource.get('environmentVariables', {}) | |
# Enhanced dependency detection using network information | |
network_info = resource.get('networkInfo', {}) | |
# Enhanced dependency detection using specific configurations | |
specific_config = resource.get('specificConfiguration', {}) | |
# Look for resource IDs in properties, environment variables, and configurations | |
all_resource_data = json.dumps({ | |
'properties': properties, | |
'environmentVariables': env_vars, | |
'specificConfiguration': specific_config, | |
'networkInfo': network_info | |
}) | |
for potential_dep in resources: | |
potential_dep_id = potential_dep.get('id') | |
if potential_dep_id and potential_dep_id != resource_id and potential_dep_id in all_resource_data: | |
confirmed_dependencies[resource_id].add(potential_dep_id) | |
# Enhanced dependency detection using environment variables (potential dependencies) | |
if env_vars: | |
# App Settings dependencies | |
app_settings = env_vars.get('appSettings', {}) | |
for setting_name, setting_value in app_settings.items(): | |
if isinstance(setting_value, str): | |
# Look for resource names in connection strings, URLs, and other settings | |
for potential_dep in resources: | |
dep_name = potential_dep.get('name', '') | |
if dep_name and len(dep_name) > 3: # Avoid matching very short names | |
# Direct name match | |
if dep_name in setting_value: | |
potential_dependencies[resource_id].add(potential_dep['id']) | |
# For URLs, also check if the resource name appears as a subdomain or hostname | |
elif dep_name.replace('-', '') in setting_value.replace('-', ''): | |
potential_dependencies[resource_id].add(potential_dep['id']) | |
# Connection strings dependencies | |
conn_strings = env_vars.get('connectionStrings', {}) | |
for conn_name, conn_value in conn_strings.items(): | |
if isinstance(conn_value, str): | |
# Look for server names or resource names in connection strings | |
for potential_dep in resources: | |
dep_name = potential_dep.get('name', '') | |
if dep_name and len(dep_name) > 3: # Avoid matching very short names | |
# Direct name match | |
if dep_name in conn_value: | |
potential_dependencies[resource_id].add(potential_dep['id']) | |
# For URLs and connection strings, also check without hyphens | |
elif dep_name.replace('-', '') in conn_value.replace('-', ''): | |
potential_dependencies[resource_id].add(potential_dep['id']) | |
# Enhanced dependency detection using network information (potential dependencies) | |
if network_info: | |
# Look for dependencies based on hostnames, endpoints, etc. | |
for key, value in network_info.items(): | |
if isinstance(value, (str, list)): | |
value_str = json.dumps(value) if isinstance(value, list) else value | |
for potential_dep in resources: | |
dep_name = potential_dep.get('name', '') | |
if dep_name and dep_name in value_str: | |
potential_dependencies[resource_id].add(potential_dep['id']) | |
# Type-specific dependency detection (confirmed dependencies from properties) | |
if resource_type == 'microsoft.app/containerapps': | |
# Container Apps depend on their environment and registry | |
managed_environment_id = properties.get('managedEnvironmentId') | |
if managed_environment_id: | |
confirmed_dependencies[resource_id].add(managed_environment_id) | |
# Container Apps secret references and registry dependencies | |
config = properties.get('configuration') | |
if isinstance(config, dict): | |
# Registry dependencies | |
registries = config.get('registries', []) | |
if not isinstance(registries, list): | |
log_failure(f"Unexpected format for 'registries' in resource {resource_id}. Expected list, got {type(registries)}. Skipping registry dependencies.") | |
registries = [] # Ensure it's an empty list to prevent TypeError | |
for registry in registries: | |
if isinstance(registry, dict): | |
server = registry.get('server', '') | |
# Find the ACR resource by matching the login server | |
for potential_acr in resources: | |
potential_acr_type = potential_acr.get('type') | |
if potential_acr_type == 'microsoft.containerregistry/registries': | |
try: | |
acr_query = f"Resources | where id == '{potential_acr.get('id')}' | project properties.loginServer" | |
acr_result = run_az_command(f"az graph query -q \"{acr_query}\" --subscription {subscription_id}") | |
if acr_result is not None and isinstance(acr_result, dict) and acr_result.get('data'): | |
if acr_result['data'] and isinstance(acr_result['data'][0], dict): | |
if acr_result['data'][0].get('properties.loginServer') == server: | |
confirmed_dependencies[resource_id].add(potential_acr['id']) | |
else: | |
log_failure(f"ACR query for {potential_acr.get('id')} returned unexpected data format: {acr_result}") | |
elif acr_result is not None: | |
log_failure(f"ACR query for {potential_acr.get('id')} returned no data or unexpected type: {acr_result}") | |
else: | |
log_failure(f"Failed to retrieve ACR login server for {potential_acr.get('id')}.") | |
except Exception as e: | |
log_failure(f"Error retrieving ACR login server for {potential_acr.get('id')}: {e}") | |
# Secret references that might point to Key Vault or other services | |
secrets = config.get('secrets', []) | |
if not isinstance(secrets, list): | |
log_failure(f"Unexpected format for 'secrets' in resource {resource_id}. Expected list, got {type(secrets)}. Skipping secret dependencies.") | |
secrets = [] | |
for secret in secrets: | |
if isinstance(secret, dict): | |
secret_name = secret.get('name', '') | |
# Common patterns for secret names that indicate dependencies | |
if 'keyvault' in secret_name.lower() or 'kv' in secret_name.lower(): | |
for potential_kv in resources: | |
if potential_kv.get('type') == 'microsoft.keyvault/vaults': | |
potential_dependencies[resource_id].add(potential_kv['id']) | |
elif 'storage' in secret_name.lower(): | |
for potential_storage in resources: | |
if potential_storage.get('type') == 'microsoft.storage/storageaccounts': | |
potential_dependencies[resource_id].add(potential_storage['id']) | |
elif 'database' in secret_name.lower() or 'db' in secret_name.lower(): | |
for potential_db in resources: | |
potential_db_type = potential_db.get('type') | |
if potential_db_type in ['microsoft.sql/servers', 'microsoft.documentdb/databaseaccounts', 'microsoft.dbforpostgresql/flexibleservers']: | |
potential_dependencies[resource_id].add(potential_db['id']) | |
# Environment variable secret references | |
template = properties.get('template') | |
if isinstance(template, dict): | |
containers = template.get('containers', []) | |
if not isinstance(containers, list): | |
log_failure(f"Unexpected format for 'containers' in resource {resource_id} template. Expected list, got {type(containers)}. Skipping container env dependencies.") | |
containers = [] | |
for container in containers: | |
if isinstance(container, dict) and 'env' in container: | |
env_list = container.get('env', []) | |
if not isinstance(env_list, list): | |
log_failure(f"Unexpected format for 'env' in container of resource {resource_id}. Expected list, got {type(env_list)}. Skipping env var dependencies.") | |
env_list = [] | |
for env_var in env_list: | |
if isinstance(env_var, dict) and 'secretRef' in env_var: | |
secret_ref = env_var['secretRef'] | |
# Analyze secret reference patterns | |
if 'appinsights' in secret_ref.lower(): | |
for potential_insights in resources: | |
if potential_insights.get('type') == 'microsoft.insights/components': | |
potential_dependencies[resource_id].add(potential_insights['id']) | |
elif 'webpubsub' in secret_ref.lower() or 'signalr' in secret_ref.lower(): | |
for potential_signalr in resources: | |
potential_signalr_type = potential_signalr.get('type') | |
if potential_signalr_type in ['microsoft.signalrservice/signalr', 'microsoft.signalrservice/webpubsub']: | |
potential_dependencies[resource_id].add(potential_signalr['id']) | |
elif resource_type == 'microsoft.web/sites': | |
# Web Apps often depend on App Service Plans (confirmed dependency) | |
server_farm_id = properties.get('serverFarmId') | |
if server_farm_id: | |
confirmed_dependencies[resource_id].add(server_farm_id) | |
# Enhanced App Insights detection using environment variables (potential dependencies) | |
if env_vars: | |
app_settings = env_vars.get('appSettings', {}) | |
# Look for Application Insights connection strings | |
for setting_name, setting_value in app_settings.items(): | |
if isinstance(setting_value, str) and 'APPLICATIONINSIGHTS' in setting_name.upper(): | |
for potential_insights in resources: | |
if potential_insights.get('type') == 'microsoft.insights/components': | |
if potential_insights.get('name') and potential_insights['name'] in setting_value: | |
potential_dependencies[resource_id].add(potential_insights['id']) | |
# Look for database connection strings | |
conn_strings = env_vars.get('connectionStrings', {}) | |
for conn_name, conn_value in conn_strings.items(): | |
if isinstance(conn_value, str): | |
# SQL Database dependencies | |
if 'database.windows.net' in conn_value or 'sql.azuresynapse.net' in conn_value: | |
for potential_sql in resources: | |
potential_sql_type = potential_sql.get('type') | |
if potential_sql_type in ['microsoft.sql/servers', 'microsoft.sql/servers/databases']: | |
if potential_sql.get('name') and potential_sql['name'] in conn_value: | |
potential_dependencies[resource_id].add(potential_sql['id']) | |
# Storage Account dependencies | |
if 'blob.core.windows.net' in conn_value or 'table.core.windows.net' in conn_value or 'queue.core.windows.net' in conn_value or 'file.core.windows.net' in conn_value: | |
for potential_storage in resources: | |
if potential_storage.get('type') == 'microsoft.storage/storageaccounts': | |
if potential_storage.get('name') and potential_storage['name'] in conn_value: | |
potential_dependencies[resource_id].add(potential_storage['id']) | |
# Cosmos DB dependencies (MongoDB, SQL API, etc.) | |
if 'cosmos.azure.com' in conn_value or 'documents.azure.com' in conn_value: | |
for potential_cosmos in resources: | |
if potential_cosmos.get('type') == 'microsoft.documentdb/databaseaccounts': | |
if potential_cosmos.get('name') and potential_cosmos['name'] in conn_value: | |
potential_dependencies[resource_id].add(potential_cosmos['id']) | |
# PostgreSQL dependencies | |
if 'postgres.database.azure.com' in conn_value: | |
for potential_pg in resources: | |
potential_pg_type = potential_pg.get('type') | |
if potential_pg_type in ['microsoft.dbforpostgresql/servers', 'microsoft.dbforpostgresql/flexibleservers']: | |
if potential_pg.get('name') and potential_pg['name'] in conn_value: | |
potential_dependencies[resource_id].add(potential_pg['id']) | |
# MySQL dependencies | |
if 'mysql.database.azure.com' in conn_value: | |
for potential_mysql in resources: | |
potential_mysql_type = potential_mysql.get('type') | |
if potential_mysql_type in ['microsoft.dbformysql/servers', 'microsoft.dbformysql/flexibleservers']: | |
if potential_mysql.get('name') and potential_mysql['name'] in conn_value: | |
potential_dependencies[resource_id].add(potential_mysql['id']) | |
# Service Bus dependencies | |
if 'servicebus.windows.net' in conn_value: | |
for potential_sb in resources: | |
if potential_sb.get('type') == 'microsoft.servicebus/namespaces': | |
if potential_sb.get('name') and potential_sb['name'] in conn_value: | |
potential_dependencies[resource_id].add(potential_sb['id']) | |
# Event Hub dependencies | |
if 'servicebus.windows.net' in conn_value and 'EntityPath=' in conn_value: | |
for potential_eh in resources: | |
if potential_eh.get('type') == 'microsoft.eventhub/namespaces': | |
if potential_eh.get('name') and potential_eh['name'] in conn_value: | |
potential_dependencies[resource_id].add(potential_eh['id']) | |
# Redis Cache dependencies | |
if 'redis.cache.windows.net' in conn_value: | |
for potential_redis in resources: | |
if potential_redis.get('type') == 'microsoft.cache/redis': | |
if potential_redis.get('name') and potential_redis['name'] in conn_value: | |
potential_dependencies[resource_id].add(potential_redis['id']) | |
# Enhanced detection for Key Vault secret references | |
if env_vars: | |
app_settings = env_vars.get('appSettings', {}) | |
for setting_name, setting_value in app_settings.items(): | |
if isinstance(setting_value, str): | |
# Key Vault secret references (format: @Microsoft.KeyVault(SecretUri=...)) | |
if '@Microsoft.KeyVault' in setting_value or 'vault.azure.net' in setting_value: | |
for potential_kv in resources: | |
if potential_kv.get('type') == 'microsoft.keyvault/vaults': | |
if potential_kv.get('name') and potential_kv['name'] in setting_value: | |
potential_dependencies[resource_id].add(potential_kv['id']) | |
# Service Bus connection strings in app settings | |
if 'servicebus.windows.net' in setting_value: | |
for potential_sb in resources: | |
if potential_sb.get('type') == 'microsoft.servicebus/namespaces': | |
if potential_sb.get('name') and potential_sb['name'] in setting_value: | |
potential_dependencies[resource_id].add(potential_sb['id']) | |
# SignalR/Web PubSub connection strings | |
if 'webpubsub.azure.com' in setting_value or 'service.signalr.net' in setting_value: | |
for potential_signalr in resources: | |
potential_signalr_type = potential_signalr.get('type') | |
if potential_signalr_type in ['microsoft.signalrservice/signalr', 'microsoft.signalrservice/webpubsub']: | |
if potential_signalr.get('name') and potential_signalr['name'] in setting_value: | |
potential_dependencies[resource_id].add(potential_signalr['id']) | |
# Check for App Insights connection in properties (confirmed dependency) | |
if properties and 'siteConfig' in properties and properties.get('siteConfig') is not None: | |
site_config = properties.get('siteConfig', {}) | |
app_settings = site_config.get('appSettings', []) # Ensure default is list | |
if app_settings and isinstance(app_settings, list): | |
for setting in app_settings: | |
if isinstance(setting, dict) and setting.get('name') == 'APPLICATIONINSIGHTS_CONNECTION_STRING' and 'value' in setting: | |
conn_string = setting['value'] | |
for potential_insights in resources: | |
if potential_insights.get('type') == 'microsoft.insights/components' and potential_insights.get('name') and potential_insights['name'] in conn_string: | |
confirmed_dependencies[resource_id].add(potential_insights['id']) | |
elif resource_type == 'microsoft.insights/components': | |
# Application Insights may depend on storage accounts for logs (potential dependency) | |
if specific_config: | |
for potential_storage in resources: | |
if potential_storage.get('type') == 'microsoft.storage/storageaccounts': | |
# Check if storage account is referenced in App Insights config | |
if potential_storage.get('name') and potential_storage['name'] in json.dumps(specific_config): | |
potential_dependencies[resource_id].add(potential_storage['id']) | |
elif resource_type == 'microsoft.apimanagement/service': | |
# APIM often connected to App Insights (potential dependency) | |
for potential_insights in resources: | |
if potential_insights.get('type') == 'microsoft.insights/components': | |
potential_dependencies[resource_id].add(potential_insights['id']) | |
# Enhanced APIM dependency detection using configuration (potential dependencies) | |
if specific_config: | |
# Look for backend services, named values, etc. | |
config_str = json.dumps(specific_config) | |
for potential_dep in resources: | |
potential_dep_type = potential_dep.get('type') | |
if potential_dep_type in ['microsoft.web/sites', 'microsoft.storage/storageaccounts', 'microsoft.keyvault/vaults']: | |
if potential_dep.get('name') and potential_dep['name'] in config_str: | |
potential_dependencies[resource_id].add(potential_dep['id']) | |
elif resource_type == 'microsoft.keyvault/vaults': | |
# Key Vault may be referenced by other services | |
# This is typically a reverse dependency, but we can detect some patterns | |
if specific_config: | |
access_policies = specific_config.get('accessPolicies', []) | |
if not isinstance(access_policies, list): | |
log_failure(f"Unexpected format for 'accessPolicies' in Key Vault {resource_id}. Expected list, got {type(access_policies)}.") | |
access_policies = [] | |
for policy in access_policies: | |
if isinstance(policy, dict) and 'objectId' in policy: | |
# Could potentially match this to service principals of other resources | |
pass | |
elif resource_type == 'microsoft.compute/virtualmachines': | |
# VMs depend on various resources (potential dependencies from config) | |
if specific_config: | |
# Check for dependencies on storage accounts (for diagnostics, disks) | |
config_str = json.dumps(specific_config) | |
for potential_storage in resources: | |
if potential_storage.get('type') == 'microsoft.storage/storageaccounts': | |
if potential_storage.get('name') and potential_storage['name'] in config_str: | |
potential_dependencies[resource_id].add(potential_storage['id']) | |
# VMs depend on their network interfaces (confirmed dependency) | |
if properties and 'networkProfile' in properties: | |
network_interfaces = properties.get('networkProfile', {}).get('networkInterfaces', []) | |
if not isinstance(network_interfaces, list): | |
log_failure(f"Unexpected format for 'networkInterfaces' in VM {resource_id}. Expected list, got {type(network_interfaces)}.") | |
network_interfaces = [] | |
for nic in network_interfaces: | |
if isinstance(nic, dict) and 'id' in nic: | |
confirmed_dependencies[resource_id].add(nic['id']) | |
elif resource_type == 'microsoft.storage/storageaccounts': | |
# Storage accounts may have network restrictions pointing to VNets (confirmed dependency) | |
if specific_config and 'networkRuleSet' in specific_config: | |
network_rules = specific_config.get('networkRuleSet', {}) | |
virtual_network_rules = network_rules.get('virtualNetworkRules', []) | |
if not isinstance(virtual_network_rules, list): | |
log_failure(f"Unexpected format for 'virtualNetworkRules' in Storage Account {resource_id}. Expected list, got {type(virtual_network_rules)}.") | |
virtual_network_rules = [] | |
for vnet_rule in virtual_network_rules: | |
if isinstance(vnet_rule, dict) and 'id' in vnet_rule: | |
# This points to a subnet, we need to find the parent VNet | |
subnet_id = vnet_rule['id'] | |
for potential_vnet in resources: | |
if potential_vnet.get('type') == 'microsoft.network/virtualnetworks': | |
if potential_vnet.get('id') and potential_vnet['id'] in subnet_id: | |
confirmed_dependencies[resource_id].add(potential_vnet['id']) | |
# Add common implicit dependencies based on resource types (potential dependencies) | |
for resource in resources: | |
resource_id = resource.get('id') | |
resource_type = resource.get('type') | |
if not resource_id: | |
continue | |
# Add implicit dependency for dashboard -> App Insights | |
if resource_type == 'microsoft.portal/dashboards': | |
for potential_dep in resources: | |
if potential_dep.get('type') == 'microsoft.insights/components': | |
potential_dependencies[resource_id].add(potential_dep['id']) | |
# Container App Environment -> App Insights | |
if resource_type == 'microsoft.app/managedenvironments': | |
for potential_dep in resources: | |
if potential_dep.get('type') == 'microsoft.insights/components': | |
potential_dependencies[resource_id].add(potential_dep['id']) | |
return confirmed_dependencies, potential_dependencies | |
def get_resource_data(subscription_id: Optional[str] = None, basic_mode: bool = False) -> Tuple[str, List[Dict[str, Any]], List[Dict[str, Any]], Dict[str, Set[str]], Dict[str, Set[str]]]: | |
""" | |
Collect all Azure resource data needed for visualization. | |
Args: | |
subscription_id: The Azure subscription ID (if None, uses current subscription) | |
basic_mode: If True, only gather basic resource information | |
Returns: | |
A tuple containing (subscription_id, resources, resource_groups, confirmed_dependencies, potential_dependencies) | |
""" | |
# Check prerequisites | |
if not check_az_cli_installed(): | |
print("Error: Azure CLI not found. Please install it first: https://docs.microsoft.com/en-us/cli/azure/install-azure-cli") | |
sys.exit(1) | |
if not check_az_cli_logged_in(): | |
print("Error: Not logged in to Azure. Please run 'az login' first.") | |
sys.exit(1) | |
# Get subscription ID | |
if not subscription_id: | |
subscription_id = get_subscription_id() | |
if not subscription_id: | |
print("Error: Could not determine Azure subscription ID. Exiting.") | |
sys.exit(1) | |
truncated_sub_id = truncate_subscription_id(subscription_id) | |
print(f"Using subscription: {truncated_sub_id}") | |
# Get resources and resource groups | |
print("Fetching resource groups...") | |
resource_groups = list_resource_groups(subscription_id) | |
print(f"Found {len(resource_groups)} resource groups") | |
# Get resources (basic or enhanced mode) | |
resources = list_resources(subscription_id, basic_mode) | |
print(f"Found {len(resources)} resources {'(basic mode)' if basic_mode else '(enhanced mode)'}") | |
# Analyze dependencies using enhanced resource data | |
confirmed_dependencies, potential_dependencies = get_resource_dependencies(subscription_id, resources) | |
# Print summary of enhanced data collected | |
enhanced_count = sum(1 for r in resources if any(key in r for key in ['networkInfo', 'environmentVariables', 'specificConfiguration'])) | |
print(f"Enhanced {enhanced_count}/{len(resources)} resources with detailed information") | |
# Print dependency summary | |
total_confirmed_deps = sum(len(deps) for deps in confirmed_dependencies.values()) | |
total_potential_deps = sum(len(deps) for deps in potential_dependencies.values()) | |
print(f"Discovered {total_confirmed_deps} confirmed dependencies and {total_potential_deps} potential dependencies") | |
return subscription_id, resources, resource_groups, confirmed_dependencies, potential_dependencies | |
def main(): | |
parser = argparse.ArgumentParser(description='Generate Azure resource dependency graph with enhanced resource discovery') | |
parser.add_argument('--subscription', '-s', type=str, help='Azure subscription ID (if not provided, uses current subscription)') | |
parser.add_argument('--no-potential-deps', action='store_true', help='Exclude potential dependencies from the diagram') | |
parser.add_argument('--output', '-o', type=str, default='azure_resource_graph', help='Output file path prefix') | |
parser.add_argument('--no-data', action='store_true', help='Use existing data file instead of fetching from Azure') | |
parser.add_argument('--no-html', action='store_true', help='Skip HTML visualization generation') | |
parser.add_argument('--no-md', action='store_true', help='Skip Markdown visualization generation') | |
parser.add_argument('--basic-mode', action='store_true', help='Use basic resource discovery without enhanced details (faster but less comprehensive)') | |
args = parser.parse_args() | |
# Determine whether to collect data or use existing file | |
data_file = f"{args.output}.json" | |
subscription_id: Optional[str] = args.subscription | |
resources: List[Dict[str, Any]] = [] | |
resource_groups: List[Dict[str, Any]] = [] | |
confirmed_dependencies: Dict[str, Set[str]] = {} | |
potential_dependencies: Dict[str, Set[str]] = {} | |
if not args.no_data: | |
# Query Azure for resource data | |
subscription_id, resources, resource_groups, confirmed_dependencies, potential_dependencies = get_resource_data(subscription_id, args.basic_mode) | |
# Save data to file | |
deps_serializable = {k: list(v) for k, v in confirmed_dependencies.items()} | |
total_confirmed_deps = sum(len(deps) for deps in confirmed_dependencies.values()) | |
total_potential_deps = sum(len(deps) for deps in potential_dependencies.values()) | |
data = { | |
'subscription_id': subscription_id, | |
'resources': resources, | |
'resource_groups': resource_groups, | |
'confirmed_dependencies': deps_serializable, | |
'potential_dependencies': {k: list(v) for k, v in potential_dependencies.items()}, | |
'metadata': { | |
'enhanced_mode': not args.basic_mode, | |
'total_resources': len(resources), | |
'total_confirmed_dependencies': total_confirmed_deps, | |
'total_potential_dependencies': total_potential_deps, | |
'enhanced_resources': sum(1 for r in resources if any(key in r for key in ['networkInfo', 'environmentVariables', 'specificConfiguration'])) if not args.basic_mode else 0 | |
} | |
} | |
try: | |
with open(data_file, 'w') as f: | |
json.dump(data, f, indent=2) | |
print(f"Resource data saved to {data_file}") | |
if not args.basic_mode: | |
print(f"Enhanced data includes network info, environment variables, and detailed configurations") | |
except IOError as e: | |
log_failure(f"Error saving data to file {data_file}: {e}") | |
else: | |
try: | |
print(f"Loading resource data from {data_file}...") | |
with open(data_file, 'r') as f: | |
data = json.load(f) | |
subscription_id = data.get('subscription_id') | |
resources = data.get('resources', []) | |
resource_groups = data.get('resource_groups', []) | |
confirmed_dependencies = {k: set(v) for k, v in data.get('confirmed_dependencies', {}).items()} | |
potential_dependencies = {k: set(v) for k, v in data.get('potential_dependencies', {}).items()} | |
if not subscription_id: | |
log_failure(f"Loaded data from {data_file} is missing 'subscription_id'. Falling back to querying Azure.") | |
subscription_id, resources, resource_groups, confirmed_dependencies, potential_dependencies = get_resource_data(args.subscription, args.basic_mode) | |
except (FileNotFoundError, json.JSONDecodeError) as e: | |
log_failure(f"Error loading data file {data_file}: {e}. Falling back to querying Azure...") | |
subscription_id, resources, resource_groups, confirmed_dependencies, potential_dependencies = get_resource_data(args.subscription, args.basic_mode) | |
except Exception as e: | |
log_failure(f"An unexpected error occurred while loading data from {data_file}: {e}. Falling back to querying Azure...") | |
subscription_id, resources, resource_groups, confirmed_dependencies, potential_dependencies = get_resource_data(args.subscription, args.basic_mode) | |
include_potential_deps = not args.no_potential_deps | |
# Generate HTML output if requested | |
if not args.no_html: | |
try: | |
from arg_html import generate_html_diagram | |
html_file = f"{args.output}.html" | |
print("Generating interactive HTML diagram...") | |
generate_html_diagram( | |
subscription_id, | |
resources, | |
resource_groups, | |
confirmed_dependencies, | |
potential_dependencies, | |
include_potential_deps, | |
html_file | |
) | |
print(f"HTML diagram saved to {html_file}") | |
except ImportError: | |
log_failure("Could not import 'arg_html'. HTML visualization will be skipped. Ensure 'arg_html.py' is in the same directory.") | |
except Exception as e: | |
log_failure(f"Error generating HTML diagram: {e}") | |
# Generate Markdown output if requested | |
if not args.no_md: | |
try: | |
from arg_mermaid import generate_mermaid_diagram | |
md_file = f"{args.output}.md" | |
print("Generating mermaid diagram...") | |
mermaid_diagram = generate_mermaid_diagram( | |
subscription_id, | |
resources, | |
resource_groups, | |
confirmed_dependencies, | |
potential_dependencies, | |
include_potential_deps | |
) | |
# Save to file | |
with open(md_file, 'w') as f: | |
f.write("# Azure Resources Dependency Graph\n\n") | |
f.write("This diagram shows the resources in your Azure subscription and their dependencies.\n\n") | |
f.write("- Resources are displayed with their proper Azure display names and resource names\n") | |
f.write("- Resource types and kinds are included for better identification\n") | |
f.write("- Solid lines represent confirmed dependencies\n") | |
if include_potential_deps: | |
f.write("- Dotted lines represent potential dependencies based on common patterns\n") | |
f.write("\n```mermaid\n") | |
f.write(mermaid_diagram) | |
f.write("\n```\n") | |
print(f"Markdown diagram saved to {md_file}") | |
except ImportError: | |
log_failure("Could not import 'arg_mermaid'. Markdown visualization will be skipped. Ensure 'arg_mermaid.py' is in the same directory.") | |
except Exception as e: | |
log_failure(f"Error generating Markdown diagram: {e}") | |
# Print final summary | |
mode_msg = "basic mode (faster, less comprehensive)" if args.basic_mode else "enhanced mode (comprehensive resource analysis)" | |
print(f"\n✅ Resource graph generation completed using {mode_msg}") | |
if include_potential_deps: | |
print("Included both confirmed and potential dependencies.") | |
else: | |
print("Included only confirmed dependencies. Potential dependencies were excluded.") | |
if not args.basic_mode: | |
print("Enhanced features included:") | |
print(" - Network information (IPs, hostnames, endpoints)") | |
print(" - Environment variables and configuration settings") | |
print(" - Resource-specific detailed configurations") | |
print(" - Advanced dependency detection using configuration data") | |
else: | |
print("To get more detailed resource information and better dependency detection, run without --basic-mode") | |
if _failed_operations_log: | |
print("\n--- Summary of Failed Operations (Non-Fatal) ---") | |
for msg in _failed_operations_log: | |
print(f"- {msg}") | |
print("-------------------------------------------------") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment