Skip to content

Instantly share code, notes, and snippets.

@MarkPryceMaherMSFT
Last active September 5, 2025 08:10
Show Gist options
  • Save MarkPryceMaherMSFT/4f9c825c9622f2a2d3a8687fb2ac7905 to your computer and use it in GitHub Desktop.
Save MarkPryceMaherMSFT/4f9c825c9622f2a2d3a8687fb2ac7905 to your computer and use it in GitHub Desktop.
Test to see how long it takes delta log publishing to work
import pandas as pd
import struct
import sqlalchemy
import pyodbc
import notebookutils
import sempy.fabric as fabric
from sqlalchemy import text
import time
# -----------------------------------------------------------------------------
# Helper function: Create a SQLAlchemy engine with Fabric token-based auth
# -----------------------------------------------------------------------------
def create_fabric_engine(connection_string: str) -> sqlalchemy.engine.Engine:
"""
Create a SQLAlchemy engine for connecting to a Fabric SQL endpoint
using token-based authentication.
Args:
connection_string (str): ODBC connection string.
Returns:
sqlalchemy.engine.Engine: SQLAlchemy engine configured for Fabric.
"""
# Retrieve Fabric token
token = notebookutils.credentials.getToken(
'https://analysis.windows.net/powerbi/api'
).encode("UTF-16-LE")
# Pack token into binary structure required by ODBC
token_struct = struct.pack(f'<I{len(token)}s', len(token), token)
SQL_COPT_SS_ACCESS_TOKEN = 1256 # SQL Server-specific constant
# Create and return engine
return sqlalchemy.create_engine(
"mssql+pyodbc://",
creator=lambda: pyodbc.connect(
connection_string,
attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct}
)
)
# -----------------------------------------------------------------------------
# Environment configuration
# -----------------------------------------------------------------------------
tenant_id = spark.conf.get("trident.tenant.id")
workspace_id = spark.conf.get("trident.workspace.id")
lakehouse_id = spark.conf.get("trident.lakehouse.id")
lakehouse_name = spark.conf.get("trident.lakehouse.name")
# Get SQL endpoint from Fabric REST API
sql_endpoint = fabric.FabricRestClient().get(
f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}"
).json()['properties']['sqlEndpointProperties']['connectionString']
# ODBC connection string
connection_string = (
f"Driver={{ODBC Driver 18 for SQL Server}};"
f"Server={sql_endpoint},1433;"
f"Encrypt=Yes;TrustServerCertificate=No;"
f"Database=dest"
)
# Create engine
engine = create_fabric_engine(connection_string)
# -----------------------------------------------------------------------------
# Table configuration
# -----------------------------------------------------------------------------
table_name = "sparktest" # 👈 Change this to target the table
# -----------------------------------------------------------------------------
# Table paths and setup
# -----------------------------------------------------------------------------
# Ensure this table exists in the warehouse before running:
# CREATE TABLE [dbo].[sparktest] ( [a] [int] NULL );
# Spark path (points to the Warehouse table in OneLake)
spark_path = (
f"abfss://[email protected]/"
f"dest.warehouse/Tables/dbo/{table_name}"
) # 👈 Change this to target the table
# -----------------------------------------------------------------------------
# Insert a row via SQL
# -----------------------------------------------------------------------------
# Get initial row count
df_initial = pd.read_sql_query(f"SELECT COUNT(*) AS total FROM {table_name};", engine)
print(f"Initial SQL count = {int(df_initial['total'].iloc[0])}")
# Insert a row
with engine.begin() as conn:
conn.execute(text(f"INSERT INTO {table_name} VALUES (66);"))
# Get updated SQL count
df_query_updated_via_sql = pd.read_sql_query(f"SELECT COUNT(*) AS total FROM {table_name};", engine)
sql_count = int(df_query_updated_via_sql['total'].iloc[0])
print(f"Updated SQL count = {sql_count}")
# -----------------------------------------------------------------------------
# Poll Spark until it reflects the SQL change (timed)
# -----------------------------------------------------------------------------
max_retries = 30 # Number of polling attempts
wait_seconds = 2 # Wait time between retries in seconds
print(f"⏳ Waiting for Spark to sync with SQL table [{table_name}]...")
start_time = time.time() # Start timer
for attempt in range(max_retries):
# Reload Spark DataFrame each time (to avoid cached results)
df_query_updated_via_spark = spark.read.format('delta').load(spark_path)
spark_count = df_query_updated_via_spark.count()
print(f"[Attempt {attempt+1}] Spark count = {spark_count}")
if spark_count == sql_count:
elapsed = time.time() - start_time
print(f"✅ Spark is now in sync with SQL (took {elapsed:.2f} seconds).")
break
else:
time.sleep(wait_seconds)
else:
elapsed = time.time() - start_time
print(f"⚠️ Timeout: Spark never matched SQL after waiting {elapsed:.2f} seconds.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment