Last active
October 4, 2025 03:12
-
-
Save MarkPryceMaherMSFT/db9f98e051c7633c0b1ac90d5a66f397 to your computer and use it in GitHub Desktop.
This code is for converting spark views to tsql views
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| %%pyspark | |
| import sempy.fabric as fabric | |
| import struct | |
| import sqlalchemy | |
| import pyodbc | |
| import pandas as pd | |
| from notebookutils import mssparkutils | |
| #Function to Return sqlalchemt ODBC Engine, given a connection string and using Integrated AAD Auth to Fabric | |
| def create_engine(connection_string : str): | |
| token = mssparkutils.credentials.getToken('https://analysis.windows.net/powerbi/api').encode("UTF-16-LE") | |
| token_struct = struct.pack(f'<I{len(token)}s', len(token), token) | |
| SQL_COPT_SS_ACCESS_TOKEN = 1256 | |
| return sqlalchemy.create_engine("mssql+pyodbc://", creator=lambda: pyodbc.connect(connection_string, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct})) | |
| import re | |
| def transform_sql_for_sqlserver(sql_text: str) -> str: | |
| """ | |
| Translate common Spark SQL syntax/features into T-SQL equivalents | |
| so Spark views can be recreated in SQL Server. | |
| """ | |
| transformed = sql_text | |
| # -------------------------------------------- | |
| # IDENTIFIERS | |
| # -------------------------------------------- | |
| # Spark allows backticks for identifiers: `col` | |
| transformed = transformed.replace("`", "[").replace("]", "]") | |
| # -------------------------------------------- | |
| # LIMIT → TOP | |
| # -------------------------------------------- | |
| limit_match = re.search(r"LIMIT\s+(\d+)", transformed, re.IGNORECASE) | |
| if limit_match: | |
| limit_val = limit_match.group(1) | |
| transformed = re.sub(r"(?i)^SELECT", f"SELECT TOP ({limit_val})", transformed, count=1) | |
| transformed = re.sub(r"(?i)\s+LIMIT\s+\d+", "", transformed) | |
| # -------------------------------------------- | |
| # FUNCTIONS | |
| # -------------------------------------------- | |
| replacements = { | |
| r"(?i)\bIFNULL\s*\(": "ISNULL(", | |
| r"(?i)\bNVL\s*\(": "ISNULL(", | |
| r"(?i)\bSIZE\s*\(": "LEN(", | |
| r"(?i)\bLCASE\s*\(": "LOWER(", | |
| r"(?i)\bUCASE\s*\(": "UPPER(", | |
| r"(?i)\bRAND\s*\(": "RAND(", # same name, but may behave differently | |
| r"(?i)\bRLIKE\b": "LIKE", # Spark regex match → fallback to LIKE | |
| r"(?i)\bSTRING\s*TYPE\b": "NVARCHAR(MAX)", | |
| } | |
| for pattern, repl in replacements.items(): | |
| transformed = re.sub(pattern, repl, transformed) | |
| # -------------------------------------------- | |
| # CAST differences | |
| # -------------------------------------------- | |
| # Spark: CAST(col AS STRING) → SQL Server: CAST(col AS NVARCHAR(MAX)) | |
| transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+STRING\)", r"CAST(\1 AS NVARCHAR(MAX))", transformed) | |
| # Spark: CAST(col AS DOUBLE) → SQL Server: FLOAT | |
| transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+DOUBLE\)", r"CAST(\1 AS FLOAT)", transformed) | |
| # Spark: CAST(col AS FLOAT) → SQL Server: FLOAT | |
| transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+FLOAT\)", r"CAST(\1 AS FLOAT)", transformed) | |
| # Spark: CAST(col AS BOOLEAN) → SQL Server: BIT | |
| transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+BOOLEAN\)", r"CAST(\1 AS BIT)", transformed) | |
| # -------------------------------------------- | |
| # DATE/TIME FUNCTIONS | |
| # -------------------------------------------- | |
| replacements_date = { | |
| r"(?i)\bCURRENT_DATE\b": "CAST(GETDATE() AS DATE)", | |
| r"(?i)\bCURRENT_TIMESTAMP\b": "GETDATE()", | |
| r"(?i)\bNOW\(\)": "GETDATE()", | |
| r"(?i)\bDATE_ADD\s*\(": "DATEADD(DAY,", # DATE_ADD(startDate, days) | |
| r"(?i)\bDATE_SUB\s*\(": "DATEADD(DAY,-", # DATE_SUB(startDate, days) | |
| r"(?i)\bUNIX_TIMESTAMP\s*\(\)": "DATEDIFF(SECOND, '1970-01-01', GETUTCDATE())", | |
| r"(?i)\bFROM_UNIXTIME\s*\(": "DATEADD(SECOND,", # FROM_UNIXTIME(epoch) | |
| } | |
| for pattern, repl in replacements_date.items(): | |
| transformed = re.sub(pattern, repl, transformed) | |
| # -------------------------------------------- | |
| # STRING FUNCTIONS | |
| # -------------------------------------------- | |
| string_funcs = { | |
| r"(?i)\bSUBSTR\s*\(": "SUBSTRING(", | |
| r"(?i)\bINSTR\s*\(": "CHARINDEX(", | |
| r"(?i)\bCONCAT_WS\s*\(": "CONCAT(", # T-SQL doesn't have CONCAT_WS before 2017 | |
| } | |
| for pattern, repl in string_funcs.items(): | |
| transformed = re.sub(pattern, repl, transformed) | |
| # -------------------------------------------- | |
| # REMOVE unsupported Spark clauses | |
| # -------------------------------------------- | |
| # Spark uses DISTRIBUTE BY, CLUSTER BY, etc. | |
| transformed = re.sub(r"(?i)\bDISTRIBUTE\s+BY\b.*", "", transformed) | |
| transformed = re.sub(r"(?i)\bCLUSTER\s+BY\b.*", "", transformed) | |
| transformed = re.sub(r"(?i)\bSORT\s+BY\b.*", "", transformed) | |
| return transformed.strip() | |
| # Get ODBC Connection String for Default LH ijn this Notebook | |
| tenant_id=spark.conf.get("trident.tenant.id") | |
| workspace_id=spark.conf.get("trident.workspace.id") | |
| lakehouse_id=spark.conf.get("trident.lakehouse.id") | |
| lakehouse_name=spark.conf.get("trident.lakehouse.name") | |
| dw_name=lakehouse_name | |
| sql_end_point= fabric.FabricRestClient().get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}").json()['properties']['sqlEndpointProperties']['connectionString'] | |
| connection_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server={sql_end_point}" | |
| print (f"connection_string={connection_string}") | |
| engine = create_engine(connection_string) | |
| with engine.connect() as alchemy_connection: | |
| #Execute a TSQL Stored Procedure or DDL/DML on a Fabric DW | |
| connection = engine.raw_connection() | |
| cursor = connection.cursor() | |
| # Step 1: List views | |
| views_df = spark.sql(f"SHOW VIEWS") | |
| views = views_df.collect() | |
| for row in views: | |
| view_name = row['viewName'] | |
| print(f"\n-- Processing view: {view_name}") | |
| # Step 2: Extract metadata with DESC EXTENDED | |
| desc_df = spark.sql(f"DESC EXTENDED {view_name}") | |
| desc = desc_df.collect() | |
| # Look for 'View Text' in the extended description | |
| view_text = None | |
| for d in desc: | |
| if d.col_name.strip().lower() == "view text": | |
| view_text = d.data_type # contains the SQL query text | |
| break | |
| if view_text: | |
| # Step 3: Generate CREATE VIEW statement | |
| print(view_text) | |
| view_text = transform_sql_for_sqlserver(view_text) | |
| create_stmt = f"CREATE OR ALTER VIEW {view_name} AS {view_text}" | |
| tsql_stmt = transform_sql_for_sqlserver(create_stmt) | |
| print(tsql_stmt) | |
| # Step 4: Execute on SQL Server | |
| try: | |
| cursor.execute(tsql_stmt) | |
| connection.commit() | |
| print(f"-- Successfully created {view_name} on SQL Server") | |
| except Exception as e: | |
| print(f"-- Failed to create {view_name}: {e}") | |
| else: | |
| print(f"-- No view text found for {view_name}") | |
| connection.commit() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment