Created
January 6, 2026 12:59
-
-
Save 7effrey89/d1bb5ca5533f606800ca341ac495d72c to your computer and use it in GitHub Desktop.
Powerbi Scanner API - returns assets in a Fabric Workspace - https://learn.microsoft.com/en-us/rest/api/power-bi/admin/workspace-info-get-scan-result
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #https://learn.microsoft.com/en-us/rest/api/power-bi/admin/workspace-info-get-scan-result | |
| #returns assets in a Fabric Workspace | |
| import time | |
| import json | |
| import os | |
| import requests | |
| from azure.identity import DefaultAzureCredential | |
| # ========= CONFIG ========= | |
| SCOPE = "https://analysis.windows.net/powerbi/api/.default" | |
| PBI_BASE = "https://api.powerbi.com/v1.0/myorg" | |
| SCAN_START = f"{PBI_BASE}/admin/workspaces/getInfo" | |
| # When scan is started, API returns an id; we poll and then fetch results: | |
| SCAN_STATUS = lambda scan_id: f"{PBI_BASE}/admin/workspaces/scanStatus/{scan_id}" | |
| SCAN_RESULT = lambda scan_id: f"{PBI_BASE}/admin/workspaces/scanResult/{scan_id}" | |
| RAW_LOG_DIR = "raw_api_logs" | |
| # ========= AUTH ========= | |
| credential = DefaultAzureCredential() | |
| def get_token(): | |
| """Get Power BI access token using Azure Default Credentials""" | |
| token = credential.get_token(SCOPE) | |
| return token.token | |
| def _write_log(file_name, payload): | |
| os.makedirs(RAW_LOG_DIR, exist_ok=True) | |
| path = os.path.join(RAW_LOG_DIR, file_name) | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(payload, f, indent=2, ensure_ascii=False) | |
| return path | |
| def pbi_post(url, token, payload=None, max_retries=5, log_file_name=None): | |
| for attempt in range(max_retries): | |
| resp = requests.post(url, json=payload or {}, headers={"Authorization": f"Bearer {token}"}) | |
| if resp.status_code == 429: | |
| retry_after = int(resp.headers.get("Retry-After", "5")) | |
| time.sleep(retry_after + attempt) | |
| continue | |
| if resp.status_code >= 400: | |
| raise RuntimeError(f"POST {url} failed: {resp.status_code} {resp.text}") | |
| resp_json = resp.json() | |
| if log_file_name: | |
| _write_log(log_file_name, resp_json) | |
| return resp_json | |
| raise RuntimeError("Exceeded retries due to throttling.") | |
| def pbi_get(url, token, max_retries=5, log_file_name=None): | |
| for attempt in range(max_retries): | |
| resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}) | |
| if resp.status_code == 429: | |
| retry_after = int(resp.headers.get("Retry-After", "5")) | |
| time.sleep(retry_after + attempt) | |
| continue | |
| if resp.status_code >= 400: | |
| raise RuntimeError(f"GET {url} failed: {resp.status_code} {resp.text}") | |
| resp_json = resp.json() | |
| if log_file_name: | |
| _write_log(log_file_name, resp_json) | |
| return resp_json | |
| raise RuntimeError("Exceeded retries due to throttling.") | |
| def start_scan(token, workspace_ids): | |
| """Start scan for specified workspace IDs (max 100 per request)""" | |
| print(f"Calling: {SCAN_START}") | |
| payload = { | |
| "workspaces": workspace_ids[:100], # API limit: max 100 workspace IDs | |
| "lineage": True, | |
| "datasourceDetails": False, | |
| "datasetSchema": False, | |
| "datasetExpressions": False, | |
| "getArtifactUsers": True | |
| } | |
| print(f"Payload: {json.dumps(payload, indent=2)}") | |
| return pbi_post(SCAN_START, token, payload, log_file_name="start_scan.json") | |
| def wait_for_scan_complete(scan_id, token, timeout_sec=900, poll_interval=5): | |
| deadline = time.time() + timeout_sec | |
| i = 0 | |
| while time.time() < deadline: | |
| status = pbi_get(SCAN_STATUS(scan_id), token, log_file_name=f"scan_status_{scan_id}_{i}.json") | |
| i += 1 | |
| if status.get("status") in ("Succeeded", "Failed"): | |
| return status | |
| time.sleep(poll_interval) | |
| raise TimeoutError("Scan did not complete in time") | |
| def download_scan_result(scan_id, token): | |
| return pbi_get(SCAN_RESULT(scan_id), token, log_file_name=f"scan_result_{scan_id}.json") | |
| def main(): | |
| token = get_token() | |
| # Get modified workspaces or specify workspace IDs manually | |
| print("Fetching modified workspaces...") | |
| modified_url = f"{PBI_BASE}/admin/workspaces/modified" | |
| modified = pbi_get(modified_url, token, log_file_name="modified_workspaces.json") | |
| workspace_ids = [w.get("id") for w in modified if w.get("id")] | |
| print(f"Found {len(workspace_ids)} modified workspaces") | |
| if not workspace_ids: | |
| print("No workspaces to scan") | |
| return | |
| scan = start_scan(token, workspace_ids) | |
| scan_id = scan.get("id") | |
| print(f"Started scan: {scan_id}") | |
| status = wait_for_scan_complete(scan_id, token) | |
| print(f"Scan status: {status.get('status')}") | |
| if status.get("status") != "Succeeded": | |
| raise RuntimeError(f"Scan failed: {status}") | |
| result = download_scan_result(scan_id, token) | |
| # Print the raw scan result for debugging | |
| print("\n--- Raw Scan Result ---") | |
| print(json.dumps(result, indent=2)) | |
| print("-----------------------\n") | |
| print(f"Full payload saved to {RAW_LOG_DIR}/scan_result_{scan_id}.json") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment