Created
September 4, 2025 16:51
-
-
Save MarkPryceMaherMSFT/6ed04f79fccc02ee0755921c2fdfea0d to your computer and use it in GitHub Desktop.
MD Sync all lakehouses
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| from notebookutils import mssparkutils | |
| import sempy.fabric as fabric | |
| from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException | |
| import json | |
| from pyspark.sql import SparkSession | |
| from pyspark.sql.types import StructType, StructField, StringType | |
| from datetime import datetime | |
| # Get token | |
| token = mssparkutils.credentials.getToken("pbi") | |
| # Workspace ID | |
| workspace_id=spark.conf.get("trident.workspace.id") | |
| workspace_name = notebookutils.mssparkutils.env.getWorkspaceName() | |
| # API endpoint | |
| url = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/lakehouses" | |
| # Headers | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json" | |
| } | |
| # Define schema for sync status table | |
| schema = StructType([ | |
| StructField("syncRunDateTime", StringType(), True), # New field | |
| StructField("workspace_name", StringType(), True), | |
| StructField("workspace_id", StringType(), True), | |
| StructField("lakehouse_name", StringType(), True), | |
| StructField("lakehouse_id", StringType(), True), | |
| StructField("sql_endpoint_id", StringType(), True), | |
| StructField("sync_status", StringType(), True), | |
| StructField("tableName", StringType(), True), | |
| StructField("status", StringType(), True), | |
| StructField("startDateTime", StringType(), True), | |
| StructField("endDateTime", StringType(), True), | |
| StructField("lastSuccessfulSyncDateTime", StringType(), True) | |
| ]) | |
| # Collect sync status records | |
| records = [] | |
| #Instantiate the client | |
| client = fabric.FabricRestClient() | |
| # Make the request | |
| response = requests.get(url, headers=headers) | |
| lakehouses = response.json()["value"] | |
| # Loop through lakehouses | |
| for lh in lakehouses: | |
| name = lh.get("displayName") | |
| lakehouse_id = lh.get("id") | |
| sql_props = lh.get("properties", {}).get("sqlEndpointProperties", {}) | |
| sql_endpoint_id = sql_props.get("id") | |
| print(f"Lakehouse Name: {name}") | |
| print(f"Lakehouse ID: {lakehouse_id}") | |
| print(f"SQL Endpoint ID: {sql_endpoint_id}") | |
| print("-" * 40) | |
| # This is the SQL endpoint I want to sync with the lakehouse, this needs to be the GUI | |
| sqlendpoint = lh['properties']['sqlEndpointProperties']['id'] | |
| # URI for the call | |
| uri = f"v1/workspaces/{workspace_id}/sqlEndpoints/{sqlendpoint}/refreshMetadata" | |
| # Example of setting a timeout | |
| payload = { "timeout": {"timeUnit": "Seconds", "value": "60"} } | |
| try: | |
| sync_run_time = datetime.utcnow().isoformat() + "Z" | |
| response = client.post(uri,json= payload, lro_wait = True) | |
| sync_status = json.loads(response.text) | |
| #display(sync_status) | |
| records = [ | |
| ( | |
| sync_run_time, | |
| workspace_name, | |
| workspace_id, | |
| name, | |
| lakehouse_id, | |
| sql_props, | |
| sql_endpoint_id, | |
| item['tableName'], | |
| item['status'], | |
| item['startDateTime'], | |
| item['endDateTime'], | |
| item['lastSuccessfulSyncDateTime'] | |
| ) | |
| for item in sync_status['value'] | |
| ] | |
| df = spark.createDataFrame(records, schema=schema) | |
| df.write.mode("append").saveAsTable("lakehouse_sync_status") | |
| except Exception as e: print(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment