Last active
September 5, 2025 08:10
-
-
Save MarkPryceMaherMSFT/4f9c825c9622f2a2d3a8687fb2ac7905 to your computer and use it in GitHub Desktop.
Test to see how long it takes delta log publishing to work
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| import struct | |
| import sqlalchemy | |
| import pyodbc | |
| import notebookutils | |
| import sempy.fabric as fabric | |
| from sqlalchemy import text | |
| import time | |
| # ----------------------------------------------------------------------------- | |
| # Helper function: Create a SQLAlchemy engine with Fabric token-based auth | |
| # ----------------------------------------------------------------------------- | |
| def create_fabric_engine(connection_string: str) -> sqlalchemy.engine.Engine: | |
| """ | |
| Create a SQLAlchemy engine for connecting to a Fabric SQL endpoint | |
| using token-based authentication. | |
| Args: | |
| connection_string (str): ODBC connection string. | |
| Returns: | |
| sqlalchemy.engine.Engine: SQLAlchemy engine configured for Fabric. | |
| """ | |
| # Retrieve Fabric token | |
| token = notebookutils.credentials.getToken( | |
| 'https://analysis.windows.net/powerbi/api' | |
| ).encode("UTF-16-LE") | |
| # Pack token into binary structure required by ODBC | |
| token_struct = struct.pack(f'<I{len(token)}s', len(token), token) | |
| SQL_COPT_SS_ACCESS_TOKEN = 1256 # SQL Server-specific constant | |
| # Create and return engine | |
| return sqlalchemy.create_engine( | |
| "mssql+pyodbc://", | |
| creator=lambda: pyodbc.connect( | |
| connection_string, | |
| attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct} | |
| ) | |
| ) | |
| # ----------------------------------------------------------------------------- | |
| # Environment configuration | |
| # ----------------------------------------------------------------------------- | |
| tenant_id = spark.conf.get("trident.tenant.id") | |
| workspace_id = spark.conf.get("trident.workspace.id") | |
| lakehouse_id = spark.conf.get("trident.lakehouse.id") | |
| lakehouse_name = spark.conf.get("trident.lakehouse.name") | |
| # Get SQL endpoint from Fabric REST API | |
| sql_endpoint = fabric.FabricRestClient().get( | |
| f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}" | |
| ).json()['properties']['sqlEndpointProperties']['connectionString'] | |
| # ODBC connection string | |
| connection_string = ( | |
| f"Driver={{ODBC Driver 18 for SQL Server}};" | |
| f"Server={sql_endpoint},1433;" | |
| f"Encrypt=Yes;TrustServerCertificate=No;" | |
| f"Database=dest" | |
| ) | |
| # Create engine | |
| engine = create_fabric_engine(connection_string) | |
| # ----------------------------------------------------------------------------- | |
| # Table configuration | |
| # ----------------------------------------------------------------------------- | |
| table_name = "sparktest" # 👈 Change this to target the table | |
| # ----------------------------------------------------------------------------- | |
| # Table paths and setup | |
| # ----------------------------------------------------------------------------- | |
| # Ensure this table exists in the warehouse before running: | |
| # CREATE TABLE [dbo].[sparktest] ( [a] [int] NULL ); | |
| # Spark path (points to the Warehouse table in OneLake) | |
| spark_path = ( | |
| f"abfss://[email protected]/" | |
| f"dest.warehouse/Tables/dbo/{table_name}" | |
| ) # 👈 Change this to target the table | |
| # ----------------------------------------------------------------------------- | |
| # Insert a row via SQL | |
| # ----------------------------------------------------------------------------- | |
| # Get initial row count | |
| df_initial = pd.read_sql_query(f"SELECT COUNT(*) AS total FROM {table_name};", engine) | |
| print(f"Initial SQL count = {int(df_initial['total'].iloc[0])}") | |
| # Insert a row | |
| with engine.begin() as conn: | |
| conn.execute(text(f"INSERT INTO {table_name} VALUES (66);")) | |
| # Get updated SQL count | |
| df_query_updated_via_sql = pd.read_sql_query(f"SELECT COUNT(*) AS total FROM {table_name};", engine) | |
| sql_count = int(df_query_updated_via_sql['total'].iloc[0]) | |
| print(f"Updated SQL count = {sql_count}") | |
| # ----------------------------------------------------------------------------- | |
| # Poll Spark until it reflects the SQL change (timed) | |
| # ----------------------------------------------------------------------------- | |
| max_retries = 30 # Number of polling attempts | |
| wait_seconds = 2 # Wait time between retries in seconds | |
| print(f"⏳ Waiting for Spark to sync with SQL table [{table_name}]...") | |
| start_time = time.time() # Start timer | |
| for attempt in range(max_retries): | |
| # Reload Spark DataFrame each time (to avoid cached results) | |
| df_query_updated_via_spark = spark.read.format('delta').load(spark_path) | |
| spark_count = df_query_updated_via_spark.count() | |
| print(f"[Attempt {attempt+1}] Spark count = {spark_count}") | |
| if spark_count == sql_count: | |
| elapsed = time.time() - start_time | |
| print(f"✅ Spark is now in sync with SQL (took {elapsed:.2f} seconds).") | |
| break | |
| else: | |
| time.sleep(wait_seconds) | |
| else: | |
| elapsed = time.time() - start_time | |
| print(f"⚠️ Timeout: Spark never matched SQL after waiting {elapsed:.2f} seconds.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment