Skip to content

Instantly share code, notes, and snippets.

@MarkPryceMaherMSFT
Created August 11, 2025 13:11
Show Gist options
  • Save MarkPryceMaherMSFT/a834d3043c38ea498029e147d061624b to your computer and use it in GitHub Desktop.
Save MarkPryceMaherMSFT/a834d3043c38ea498029e147d061624b to your computer and use it in GitHub Desktop.
View Migration Script: SQL Server ➜ Fabric Warehouse/Lakehouse -------------------------------------------------------------- This script copies one or more view definitions from a source SQL Server database into a Fabric Warehouse or Lakehouse SQL endpoint. Key Features: - Connects to SQL Server using SQLAlchemy/pyodbc. - Connects to Fabric usi…
"""
View Migration Script: SQL Server ➜ Fabric Warehouse/Lakehouse
--------------------------------------------------------------
This script copies one or more view definitions from a source SQL Server database
into a Fabric Warehouse or Lakehouse SQL endpoint.
Key Features:
- Connects to SQL Server using SQLAlchemy/pyodbc.
- Connects to Fabric using MSI authentication (access token).
- Reads view definitions from sys.views/sys.sql_modules in the source.
- Allows the user to specify which views to migrate.
- Replaces 'CREATE VIEW' with 'CREATE OR ALTER VIEW' for idempotent creation.
- Executes the migration with error handling, logging successes/failures.
- Writes any failed migrations to a CSV for retry.
"""
import pandas as pd
import re
import struct
import sqlalchemy
from sqlalchemy import create_engine, text
import pyodbc
import notebookutils
import sempy.fabric as fabric
# -------------------------------------------------
# Helper: Create a SQLAlchemy engine for Fabric SQL endpoint with MSI token
# -------------------------------------------------
def create_engine_alt(connection_string: str):
"""
Creates a SQLAlchemy engine for Fabric SQL endpoint authentication
using Managed Service Identity (MSI) access token.
Args:
connection_string (str): ODBC connection string for Fabric SQL endpoint.
Returns:
sqlalchemy.Engine: Configured SQLAlchemy engine.
"""
token = notebookutils.credentials.getToken(
'https://analysis.windows.net/powerbi/api'
).encode("UTF-16-LE")
token_struct = struct.pack(f'<I{len(token)}s', len(token), token)
SQL_COPT_SS_ACCESS_TOKEN = 1256
return sqlalchemy.create_engine(
"mssql+pyodbc://",
creator=lambda: pyodbc.connect(
connection_string,
attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct}
)
)
# -------------------------------------------------
# Get Fabric environment metadata (workspace/lakehouse info)
# -------------------------------------------------
tenant_id = spark.conf.get("trident.tenant.id")
workspace_id = spark.conf.get("trident.workspace.id")
lakehouse_id = spark.conf.get("trident.lakehouse.id")
lakehouse_name = spark.conf.get("trident.lakehouse.name")
# Fetch SQL endpoint connection string for the Fabric Lakehouse
sql_endpoint = fabric.FabricRestClient().get(
f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}"
).json()['properties']['sqlEndpointProperties']['connectionString']
# -------------------------------------------------
# Source SQL Server details (replace with your secure config)
# -------------------------------------------------
source_sql_endpoint = "<SOURCE_SERVER_NAME>" # e.g., myserver.database.windows.net
source_database = "<SOURCE_DATABASE_NAME>"
username = "<SOURCE_USERNAME>"
password = "<SOURCE_PASSWORD>" # Load securely (e.g., environment var or key vault)
# -------------------------------------------------
# Build connection strings
# -------------------------------------------------
# SQL Server connection string (SQL Authentication)
source_connection_string = (
f"mssql+pyodbc://{username}:{password}@{source_sql_endpoint}/{source_database}"
"?driver=ODBC+Driver+18+for+SQL+Server"
)
# Fabric SQL endpoint connection string (ODBC, MSI token later)
destination_connection_string = (
f"Driver={{ODBC Driver 18 for SQL Server}};Server={sql_endpoint},1433;"
"Encrypt=Yes;TrustServerCertificate=No"
)
# -------------------------------------------------
# Step 1: Read all view definitions from source SQL Server
# -------------------------------------------------
source_engine = create_engine(source_connection_string)
df_views = pd.read_sql_query("""
SELECT
v.name AS ViewName,
m.definition AS ViewDefinition
FROM sys.views v
JOIN sys.sql_modules m ON v.object_id = m.object_id
ORDER BY v.name;
""", source_engine)
display(df_views) # Optional: inspect all available views
# -------------------------------------------------
# Step 2: User specifies which views to migrate
# -------------------------------------------------
views_to_copy = ["ViewA", "ViewB", "MyDemoView"] # Example list
df_filtered = df_views[df_views['ViewName'].isin(views_to_copy)]
# -------------------------------------------------
# Step 3: Connect to Fabric SQL endpoint and migrate each view
# -------------------------------------------------
failed_views = [] # To track failed migrations
engine_dest = create_engine_alt(destination_connection_string)
with engine_dest.begin() as conn:
for _, row in df_filtered.iterrows():
view_name = row['ViewName']
definition = row['ViewDefinition'].strip()
# Replace CREATE VIEW with CREATE OR ALTER VIEW (case-insensitive)
create_or_alter_sql = re.sub(
r"CREATE\s+VIEW",
"CREATE OR ALTER VIEW",
definition,
flags=re.IGNORECASE
)
try:
conn.execute(text(create_or_alter_sql))
print(f"✅ Migrated view: {view_name}")
except Exception as e:
print(f"❌ Failed to migrate view: {view_name}")
failed_views.append({
"ViewName": view_name,
"Error": str(e),
"Definition": definition
})
# -------------------------------------------------
# Step 4: Save failed migrations to CSV
# -------------------------------------------------
if failed_views:
failed_df = pd.DataFrame(failed_views)
failed_df.to_csv("/lakehouse/default/Files/failed_views.csv", index=False)
print(f"⚠️ {len(failed_views)} views failed. Details saved to failed_views.csv")
else:
print("✅ All views migrated successfully!")
print("✅ View migration process complete!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment