Skip to content

Instantly share code, notes, and snippets.

@MarkPryceMaherMSFT
Last active October 4, 2025 03:12
Show Gist options
  • Save MarkPryceMaherMSFT/db9f98e051c7633c0b1ac90d5a66f397 to your computer and use it in GitHub Desktop.
Save MarkPryceMaherMSFT/db9f98e051c7633c0b1ac90d5a66f397 to your computer and use it in GitHub Desktop.
This code is for converting spark views to tsql views
%%pyspark
import sempy.fabric as fabric
import struct
import sqlalchemy
import pyodbc
import pandas as pd
from notebookutils import mssparkutils
#Function to Return sqlalchemt ODBC Engine, given a connection string and using Integrated AAD Auth to Fabric
def create_engine(connection_string : str):
token = mssparkutils.credentials.getToken('https://analysis.windows.net/powerbi/api').encode("UTF-16-LE")
token_struct = struct.pack(f'<I{len(token)}s', len(token), token)
SQL_COPT_SS_ACCESS_TOKEN = 1256
return sqlalchemy.create_engine("mssql+pyodbc://", creator=lambda: pyodbc.connect(connection_string, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct}))
import re
def transform_sql_for_sqlserver(sql_text: str) -> str:
"""
Translate common Spark SQL syntax/features into T-SQL equivalents
so Spark views can be recreated in SQL Server.
"""
transformed = sql_text
# --------------------------------------------
# IDENTIFIERS
# --------------------------------------------
# Spark allows backticks for identifiers: `col`
transformed = transformed.replace("`", "[").replace("]", "]")
# --------------------------------------------
# LIMIT → TOP
# --------------------------------------------
limit_match = re.search(r"LIMIT\s+(\d+)", transformed, re.IGNORECASE)
if limit_match:
limit_val = limit_match.group(1)
transformed = re.sub(r"(?i)^SELECT", f"SELECT TOP ({limit_val})", transformed, count=1)
transformed = re.sub(r"(?i)\s+LIMIT\s+\d+", "", transformed)
# --------------------------------------------
# FUNCTIONS
# --------------------------------------------
replacements = {
r"(?i)\bIFNULL\s*\(": "ISNULL(",
r"(?i)\bNVL\s*\(": "ISNULL(",
r"(?i)\bSIZE\s*\(": "LEN(",
r"(?i)\bLCASE\s*\(": "LOWER(",
r"(?i)\bUCASE\s*\(": "UPPER(",
r"(?i)\bRAND\s*\(": "RAND(", # same name, but may behave differently
r"(?i)\bRLIKE\b": "LIKE", # Spark regex match → fallback to LIKE
r"(?i)\bSTRING\s*TYPE\b": "NVARCHAR(MAX)",
}
for pattern, repl in replacements.items():
transformed = re.sub(pattern, repl, transformed)
# --------------------------------------------
# CAST differences
# --------------------------------------------
# Spark: CAST(col AS STRING) → SQL Server: CAST(col AS NVARCHAR(MAX))
transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+STRING\)", r"CAST(\1 AS NVARCHAR(MAX))", transformed)
# Spark: CAST(col AS DOUBLE) → SQL Server: FLOAT
transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+DOUBLE\)", r"CAST(\1 AS FLOAT)", transformed)
# Spark: CAST(col AS FLOAT) → SQL Server: FLOAT
transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+FLOAT\)", r"CAST(\1 AS FLOAT)", transformed)
# Spark: CAST(col AS BOOLEAN) → SQL Server: BIT
transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+BOOLEAN\)", r"CAST(\1 AS BIT)", transformed)
# --------------------------------------------
# DATE/TIME FUNCTIONS
# --------------------------------------------
replacements_date = {
r"(?i)\bCURRENT_DATE\b": "CAST(GETDATE() AS DATE)",
r"(?i)\bCURRENT_TIMESTAMP\b": "GETDATE()",
r"(?i)\bNOW\(\)": "GETDATE()",
r"(?i)\bDATE_ADD\s*\(": "DATEADD(DAY,", # DATE_ADD(startDate, days)
r"(?i)\bDATE_SUB\s*\(": "DATEADD(DAY,-", # DATE_SUB(startDate, days)
r"(?i)\bUNIX_TIMESTAMP\s*\(\)": "DATEDIFF(SECOND, '1970-01-01', GETUTCDATE())",
r"(?i)\bFROM_UNIXTIME\s*\(": "DATEADD(SECOND,", # FROM_UNIXTIME(epoch)
}
for pattern, repl in replacements_date.items():
transformed = re.sub(pattern, repl, transformed)
# --------------------------------------------
# STRING FUNCTIONS
# --------------------------------------------
string_funcs = {
r"(?i)\bSUBSTR\s*\(": "SUBSTRING(",
r"(?i)\bINSTR\s*\(": "CHARINDEX(",
r"(?i)\bCONCAT_WS\s*\(": "CONCAT(", # T-SQL doesn't have CONCAT_WS before 2017
}
for pattern, repl in string_funcs.items():
transformed = re.sub(pattern, repl, transformed)
# --------------------------------------------
# REMOVE unsupported Spark clauses
# --------------------------------------------
# Spark uses DISTRIBUTE BY, CLUSTER BY, etc.
transformed = re.sub(r"(?i)\bDISTRIBUTE\s+BY\b.*", "", transformed)
transformed = re.sub(r"(?i)\bCLUSTER\s+BY\b.*", "", transformed)
transformed = re.sub(r"(?i)\bSORT\s+BY\b.*", "", transformed)
return transformed.strip()
# Get ODBC Connection String for Default LH ijn this Notebook
tenant_id=spark.conf.get("trident.tenant.id")
workspace_id=spark.conf.get("trident.workspace.id")
lakehouse_id=spark.conf.get("trident.lakehouse.id")
lakehouse_name=spark.conf.get("trident.lakehouse.name")
dw_name=lakehouse_name
sql_end_point= fabric.FabricRestClient().get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}").json()['properties']['sqlEndpointProperties']['connectionString']
connection_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server={sql_end_point}"
print (f"connection_string={connection_string}")
engine = create_engine(connection_string)
with engine.connect() as alchemy_connection:
#Execute a TSQL Stored Procedure or DDL/DML on a Fabric DW
connection = engine.raw_connection()
cursor = connection.cursor()
# Step 1: List views
views_df = spark.sql(f"SHOW VIEWS")
views = views_df.collect()
for row in views:
view_name = row['viewName']
print(f"\n-- Processing view: {view_name}")
# Step 2: Extract metadata with DESC EXTENDED
desc_df = spark.sql(f"DESC EXTENDED {view_name}")
desc = desc_df.collect()
# Look for 'View Text' in the extended description
view_text = None
for d in desc:
if d.col_name.strip().lower() == "view text":
view_text = d.data_type # contains the SQL query text
break
if view_text:
# Step 3: Generate CREATE VIEW statement
print(view_text)
view_text = transform_sql_for_sqlserver(view_text)
create_stmt = f"CREATE OR ALTER VIEW {view_name} AS {view_text}"
tsql_stmt = transform_sql_for_sqlserver(create_stmt)
print(tsql_stmt)
# Step 4: Execute on SQL Server
try:
cursor.execute(tsql_stmt)
connection.commit()
print(f"-- Successfully created {view_name} on SQL Server")
except Exception as e:
print(f"-- Failed to create {view_name}: {e}")
else:
print(f"-- No view text found for {view_name}")
connection.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment