Created
August 11, 2025 13:11
-
-
Save MarkPryceMaherMSFT/a834d3043c38ea498029e147d061624b to your computer and use it in GitHub Desktop.
View Migration Script: SQL Server ➜ Fabric Warehouse/Lakehouse -------------------------------------------------------------- This script copies one or more view definitions from a source SQL Server database into a Fabric Warehouse or Lakehouse SQL endpoint. Key Features: - Connects to SQL Server using SQLAlchemy/pyodbc. - Connects to Fabric usi…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| View Migration Script: SQL Server ➜ Fabric Warehouse/Lakehouse | |
| -------------------------------------------------------------- | |
| This script copies one or more view definitions from a source SQL Server database | |
| into a Fabric Warehouse or Lakehouse SQL endpoint. | |
| Key Features: | |
| - Connects to SQL Server using SQLAlchemy/pyodbc. | |
| - Connects to Fabric using MSI authentication (access token). | |
| - Reads view definitions from sys.views/sys.sql_modules in the source. | |
| - Allows the user to specify which views to migrate. | |
| - Replaces 'CREATE VIEW' with 'CREATE OR ALTER VIEW' for idempotent creation. | |
| - Executes the migration with error handling, logging successes/failures. | |
| - Writes any failed migrations to a CSV for retry. | |
| """ | |
| import pandas as pd | |
| import re | |
| import struct | |
| import sqlalchemy | |
| from sqlalchemy import create_engine, text | |
| import pyodbc | |
| import notebookutils | |
| import sempy.fabric as fabric | |
| # ------------------------------------------------- | |
| # Helper: Create a SQLAlchemy engine for Fabric SQL endpoint with MSI token | |
| # ------------------------------------------------- | |
| def create_engine_alt(connection_string: str): | |
| """ | |
| Creates a SQLAlchemy engine for Fabric SQL endpoint authentication | |
| using Managed Service Identity (MSI) access token. | |
| Args: | |
| connection_string (str): ODBC connection string for Fabric SQL endpoint. | |
| Returns: | |
| sqlalchemy.Engine: Configured SQLAlchemy engine. | |
| """ | |
| token = notebookutils.credentials.getToken( | |
| 'https://analysis.windows.net/powerbi/api' | |
| ).encode("UTF-16-LE") | |
| token_struct = struct.pack(f'<I{len(token)}s', len(token), token) | |
| SQL_COPT_SS_ACCESS_TOKEN = 1256 | |
| return sqlalchemy.create_engine( | |
| "mssql+pyodbc://", | |
| creator=lambda: pyodbc.connect( | |
| connection_string, | |
| attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct} | |
| ) | |
| ) | |
| # ------------------------------------------------- | |
| # Get Fabric environment metadata (workspace/lakehouse info) | |
| # ------------------------------------------------- | |
| tenant_id = spark.conf.get("trident.tenant.id") | |
| workspace_id = spark.conf.get("trident.workspace.id") | |
| lakehouse_id = spark.conf.get("trident.lakehouse.id") | |
| lakehouse_name = spark.conf.get("trident.lakehouse.name") | |
| # Fetch SQL endpoint connection string for the Fabric Lakehouse | |
| sql_endpoint = fabric.FabricRestClient().get( | |
| f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}" | |
| ).json()['properties']['sqlEndpointProperties']['connectionString'] | |
| # ------------------------------------------------- | |
| # Source SQL Server details (replace with your secure config) | |
| # ------------------------------------------------- | |
| source_sql_endpoint = "<SOURCE_SERVER_NAME>" # e.g., myserver.database.windows.net | |
| source_database = "<SOURCE_DATABASE_NAME>" | |
| username = "<SOURCE_USERNAME>" | |
| password = "<SOURCE_PASSWORD>" # Load securely (e.g., environment var or key vault) | |
| # ------------------------------------------------- | |
| # Build connection strings | |
| # ------------------------------------------------- | |
| # SQL Server connection string (SQL Authentication) | |
| source_connection_string = ( | |
| f"mssql+pyodbc://{username}:{password}@{source_sql_endpoint}/{source_database}" | |
| "?driver=ODBC+Driver+18+for+SQL+Server" | |
| ) | |
| # Fabric SQL endpoint connection string (ODBC, MSI token later) | |
| destination_connection_string = ( | |
| f"Driver={{ODBC Driver 18 for SQL Server}};Server={sql_endpoint},1433;" | |
| "Encrypt=Yes;TrustServerCertificate=No" | |
| ) | |
| # ------------------------------------------------- | |
| # Step 1: Read all view definitions from source SQL Server | |
| # ------------------------------------------------- | |
| source_engine = create_engine(source_connection_string) | |
| df_views = pd.read_sql_query(""" | |
| SELECT | |
| v.name AS ViewName, | |
| m.definition AS ViewDefinition | |
| FROM sys.views v | |
| JOIN sys.sql_modules m ON v.object_id = m.object_id | |
| ORDER BY v.name; | |
| """, source_engine) | |
| display(df_views) # Optional: inspect all available views | |
| # ------------------------------------------------- | |
| # Step 2: User specifies which views to migrate | |
| # ------------------------------------------------- | |
| views_to_copy = ["ViewA", "ViewB", "MyDemoView"] # Example list | |
| df_filtered = df_views[df_views['ViewName'].isin(views_to_copy)] | |
| # ------------------------------------------------- | |
| # Step 3: Connect to Fabric SQL endpoint and migrate each view | |
| # ------------------------------------------------- | |
| failed_views = [] # To track failed migrations | |
| engine_dest = create_engine_alt(destination_connection_string) | |
| with engine_dest.begin() as conn: | |
| for _, row in df_filtered.iterrows(): | |
| view_name = row['ViewName'] | |
| definition = row['ViewDefinition'].strip() | |
| # Replace CREATE VIEW with CREATE OR ALTER VIEW (case-insensitive) | |
| create_or_alter_sql = re.sub( | |
| r"CREATE\s+VIEW", | |
| "CREATE OR ALTER VIEW", | |
| definition, | |
| flags=re.IGNORECASE | |
| ) | |
| try: | |
| conn.execute(text(create_or_alter_sql)) | |
| print(f"✅ Migrated view: {view_name}") | |
| except Exception as e: | |
| print(f"❌ Failed to migrate view: {view_name}") | |
| failed_views.append({ | |
| "ViewName": view_name, | |
| "Error": str(e), | |
| "Definition": definition | |
| }) | |
| # ------------------------------------------------- | |
| # Step 4: Save failed migrations to CSV | |
| # ------------------------------------------------- | |
| if failed_views: | |
| failed_df = pd.DataFrame(failed_views) | |
| failed_df.to_csv("/lakehouse/default/Files/failed_views.csv", index=False) | |
| print(f"⚠️ {len(failed_views)} views failed. Details saved to failed_views.csv") | |
| else: | |
| print("✅ All views migrated successfully!") | |
| print("✅ View migration process complete!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment