Skip to content

Instantly share code, notes, and snippets.

@MarkPryceMaherMSFT
Created September 27, 2024 21:11
Show Gist options
  • Save MarkPryceMaherMSFT/bc852666dd86e42ec7b142310fe2ffc6 to your computer and use it in GitHub Desktop.
Save MarkPryceMaherMSFT/bc852666dd86e42ec7b142310fe2ffc6 to your computer and use it in GitHub Desktop.
test the performance of the sync
import pandas as pd
import struct
import sqlalchemy
import pyodbc
import notebookutils
import sempy.fabric as fabric
import time
from pyspark.sql import functions as fn
from datetime import datetime
def create_engine(connection_string : str):
token = notebookutils.credentials.getToken('https://analysis.windows.net/powerbi/api').encode("UTF-16-LE")
token_struct = struct.pack(f'<I{len(token)}s', len(token), token)
SQL_COPT_SS_ACCESS_TOKEN = 1256
return sqlalchemy.create_engine("mssql+pyodbc://", creator=lambda: pyodbc.connect(connection_string, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct}))
tenant_id=spark.conf.get("trident.tenant.id")
workspace_id=spark.conf.get("trident.workspace.id")
lakehouse_id=spark.conf.get("trident.lakehouse.id")
lakehouse_name=spark.conf.get("trident.lakehouse.name")
sql_endpoint= fabric.FabricRestClient().get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}").json()['properties']['sqlEndpointProperties']['connectionString']
connection_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server={sql_endpoint},1433;Encrypt=Yes;TrustServerCertificate=No"
print (f"connection_string={connection_string}")
engine = create_engine(connection_string)
#df = pd.read_sql_query("SELECT DB_NAME() AS [Current Database]; ", engine)
#display(df)
## instructions
## 1. create a lakehouse
## 2. load the sample data
## 3. Run this cell
spark.sql("drop table if exists publicholidays2;")
spark.sql("create table publicholidays2 as select * from publicholidays limit 500;")
#lh = "demolake"
tablename = "publicholidays2"
#schema = "dbo"
sparkrc = 0
sqlrc= 1
i=1
now = datetime.now()
print("now =", now)
while sparkrc != sqlrc :
print(i) # Print the current value of i
i += 1
try:
dfs = pd.read_sql_query("SELECT DB_NAME() AS [Current Database]; ", engine)
sparkquery = f"select count(*) as row_count from {tablename}"
sqlquery = f"select count(*) as row_count from {tablename}"
df = spark.sql(sparkquery)
dfx = pd.read_sql_query(sqlquery, engine)
df_pandas = df.toPandas()
sparkrc = df_pandas['row_count'].values[0]
sqlrc = dfx['row_count'].values[0]
print(f"Spark table count {sparkrc} SQL count {sqlrc}")
except Exception as e:
# handling any other exceptions
print("An error occurred:", str(e))
time.sleep(5)
now = datetime.now()
print("now =", now)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment