Skip to content

Instantly share code, notes, and snippets.

@7effrey89
Created January 6, 2026 12:59
Show Gist options
  • Select an option

  • Save 7effrey89/d1bb5ca5533f606800ca341ac495d72c to your computer and use it in GitHub Desktop.

Select an option

Save 7effrey89/d1bb5ca5533f606800ca341ac495d72c to your computer and use it in GitHub Desktop.
Powerbi Scanner API - returns assets in a Fabric Workspace - https://learn.microsoft.com/en-us/rest/api/power-bi/admin/workspace-info-get-scan-result
#https://learn.microsoft.com/en-us/rest/api/power-bi/admin/workspace-info-get-scan-result
#returns assets in a Fabric Workspace
import time
import json
import os
import requests
from azure.identity import DefaultAzureCredential
# ========= CONFIG =========
SCOPE = "https://analysis.windows.net/powerbi/api/.default"
PBI_BASE = "https://api.powerbi.com/v1.0/myorg"
SCAN_START = f"{PBI_BASE}/admin/workspaces/getInfo"
# When scan is started, API returns an id; we poll and then fetch results:
SCAN_STATUS = lambda scan_id: f"{PBI_BASE}/admin/workspaces/scanStatus/{scan_id}"
SCAN_RESULT = lambda scan_id: f"{PBI_BASE}/admin/workspaces/scanResult/{scan_id}"
RAW_LOG_DIR = "raw_api_logs"
# ========= AUTH =========
credential = DefaultAzureCredential()
def get_token():
"""Get Power BI access token using Azure Default Credentials"""
token = credential.get_token(SCOPE)
return token.token
def _write_log(file_name, payload):
os.makedirs(RAW_LOG_DIR, exist_ok=True)
path = os.path.join(RAW_LOG_DIR, file_name)
with open(path, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2, ensure_ascii=False)
return path
def pbi_post(url, token, payload=None, max_retries=5, log_file_name=None):
for attempt in range(max_retries):
resp = requests.post(url, json=payload or {}, headers={"Authorization": f"Bearer {token}"})
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", "5"))
time.sleep(retry_after + attempt)
continue
if resp.status_code >= 400:
raise RuntimeError(f"POST {url} failed: {resp.status_code} {resp.text}")
resp_json = resp.json()
if log_file_name:
_write_log(log_file_name, resp_json)
return resp_json
raise RuntimeError("Exceeded retries due to throttling.")
def pbi_get(url, token, max_retries=5, log_file_name=None):
for attempt in range(max_retries):
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"})
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", "5"))
time.sleep(retry_after + attempt)
continue
if resp.status_code >= 400:
raise RuntimeError(f"GET {url} failed: {resp.status_code} {resp.text}")
resp_json = resp.json()
if log_file_name:
_write_log(log_file_name, resp_json)
return resp_json
raise RuntimeError("Exceeded retries due to throttling.")
def start_scan(token, workspace_ids):
"""Start scan for specified workspace IDs (max 100 per request)"""
print(f"Calling: {SCAN_START}")
payload = {
"workspaces": workspace_ids[:100], # API limit: max 100 workspace IDs
"lineage": True,
"datasourceDetails": False,
"datasetSchema": False,
"datasetExpressions": False,
"getArtifactUsers": True
}
print(f"Payload: {json.dumps(payload, indent=2)}")
return pbi_post(SCAN_START, token, payload, log_file_name="start_scan.json")
def wait_for_scan_complete(scan_id, token, timeout_sec=900, poll_interval=5):
deadline = time.time() + timeout_sec
i = 0
while time.time() < deadline:
status = pbi_get(SCAN_STATUS(scan_id), token, log_file_name=f"scan_status_{scan_id}_{i}.json")
i += 1
if status.get("status") in ("Succeeded", "Failed"):
return status
time.sleep(poll_interval)
raise TimeoutError("Scan did not complete in time")
def download_scan_result(scan_id, token):
return pbi_get(SCAN_RESULT(scan_id), token, log_file_name=f"scan_result_{scan_id}.json")
def main():
token = get_token()
# Get modified workspaces or specify workspace IDs manually
print("Fetching modified workspaces...")
modified_url = f"{PBI_BASE}/admin/workspaces/modified"
modified = pbi_get(modified_url, token, log_file_name="modified_workspaces.json")
workspace_ids = [w.get("id") for w in modified if w.get("id")]
print(f"Found {len(workspace_ids)} modified workspaces")
if not workspace_ids:
print("No workspaces to scan")
return
scan = start_scan(token, workspace_ids)
scan_id = scan.get("id")
print(f"Started scan: {scan_id}")
status = wait_for_scan_complete(scan_id, token)
print(f"Scan status: {status.get('status')}")
if status.get("status") != "Succeeded":
raise RuntimeError(f"Scan failed: {status}")
result = download_scan_result(scan_id, token)
# Print the raw scan result for debugging
print("\n--- Raw Scan Result ---")
print(json.dumps(result, indent=2))
print("-----------------------\n")
print(f"Full payload saved to {RAW_LOG_DIR}/scan_result_{scan_id}.json")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment