Skip to content

Instantly share code, notes, and snippets.

@7effrey89
Last active December 9, 2025 09:09
Show Gist options
  • Select an option

  • Save 7effrey89/07e620053664ba76a4aea73bcfe984c6 to your computer and use it in GitHub Desktop.

Select an option

Save 7effrey89/07e620053664ba76a4aea73bcfe984c6 to your computer and use it in GitHub Desktop.
Demonstrate how to connect to the Livy Endpoint in Microsoft Fabric
"""
Documentation: https://learn.microsoft.com/en-us/fabric/data-engineering/get-started-api-livy
Fabric Livy API Sample Script
==============================
This script demonstrates how to use the Livy API to submit and execute
Spark session jobs in Microsoft Fabric.
Prerequisites:
- Fabric Premium or Trial capacity with a Lakehouse
- Enable the Tenant Admin Setting for Livy API (preview)
- Microsoft Entra app with appropriate permissions
- Install required packages: pip install msal requests
Required Entra App API Permissions:
- Lakehouse.Execute.All
- Lakehouse.Read.All
- Item.ReadWrite.All
- Workspace.ReadWrite.All
- Code.AccessStorage.All
- Code.AccessFabric.All
Fabric Workspace:
- Deployed Lakehouse
- The app registration (SPN) need contributor on workspace
"""
import json
import time
import requests
from msal import ConfidentialClientApplication
# =============================================================================
# CONFIGURATION - Your Fabric and Entra App settings
# =============================================================================
# Microsoft Entra (Azure AD) Configuration
TENANT_ID = "f" # Your Microsoft Entra tenant ID - update this!
CLIENT_ID = ""
CLIENT_SECRET = "B"
# Fabric Workspace and Lakehouse IDs
WORKSPACE_ID = ""
LAKEHOUSE_ID = ""
# API Configuration
API_BASE_URL = "https://api.fabric.microsoft.com"
LIVY_API_VERSION = "2024-07-30"
# Scope for client credentials flow (Service Principal)
SCOPES = ["https://api.fabric.microsoft.com/.default"]
# =============================================================================
# AUTHENTICATION
# =============================================================================
def get_access_token_with_secret(tenant_id: str, client_id: str, client_secret: str) -> str:
"""
Get an access token using client credentials (client secret).
This uses the Service Principal (SPN) flow - no user interaction needed.
Args:
tenant_id: Microsoft Entra tenant ID
client_id: Application (client) ID
client_secret: Client secret value
Returns:
Access token string
"""
from msal import ConfidentialClientApplication
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = ConfidentialClientApplication(
client_id,
authority=authority,
client_credential=client_secret
)
print("Acquiring token using client credentials...")
token_response = app.acquire_token_for_client(scopes=SCOPES)
if "access_token" in token_response:
print("✓ Successfully acquired access token")
return token_response["access_token"]
else:
error_msg = token_response.get('error_description', 'Unknown error')
raise Exception(f"Authentication failed: {error_msg}")
# =============================================================================
# LIVY SESSION MANAGEMENT
# =============================================================================
def create_livy_session(base_url: str, headers: dict) -> tuple:
"""
Create a new Livy Spark session.
Args:
base_url: The Livy API base URL for sessions
headers: Request headers including authorization
Returns:
Tuple of (session_id, session_url)
"""
print("\n" + "="*60)
print("Creating Livy Spark session...")
print("="*60)
response = requests.post(base_url, headers=headers, json={})
# 200 OK or 202 Accepted are both valid success responses
if response.status_code in [200, 202]:
session_info = response.json()
session_id = session_info['id']
session_url = f"{base_url}/{session_id}"
print(f"✓ Session created successfully")
print(f" Session ID: {session_id}")
print(f" Session State: {session_info.get('state', 'starting')}")
return session_id, session_url
else:
raise Exception(f"Failed to create session: {response.status_code} - {response.text}")
def wait_for_session_idle(session_url: str, headers: dict, timeout_seconds: int = 300) -> None:
"""
Wait for the Livy session to become idle (ready to accept statements).
Args:
session_url: URL of the Livy session
headers: Request headers including authorization
timeout_seconds: Maximum time to wait
"""
print("\nWaiting for session to become idle...")
start_time = time.time()
while True:
response = requests.get(session_url, headers=headers)
session_status = response.json()
state = session_status.get('state', 'unknown')
if state == "idle":
print(f"✓ Session is now idle and ready")
return
elif state in ["dead", "error", "killed"]:
raise Exception(f"Session entered terminal state: {state}")
elif time.time() - start_time > timeout_seconds:
raise Exception(f"Timeout waiting for session to become idle")
else:
print(f" Session state: {state} - waiting...")
time.sleep(5)
def execute_spark_statement(session_url: str, headers: dict, code: str, kind: str = "pyspark") -> dict:
"""
Execute a Spark statement in the Livy session.
Args:
session_url: URL of the Livy session
headers: Request headers including authorization
code: The Spark/PySpark code to execute
kind: The type of code (pyspark, spark, sql)
Returns:
Statement result dictionary
"""
print("\n" + "-"*60)
print("Executing Spark statement...")
print("-"*60)
print(f"Code: {code[:100]}{'...' if len(code) > 100 else ''}")
statements_url = f"{session_url}/statements"
payload = {
"code": code,
"kind": kind
}
# Submit the statement
response = requests.post(statements_url, headers=headers, json=payload)
if response.status_code != 200:
raise Exception(f"Failed to submit statement: {response.status_code} - {response.text}")
statement_info = response.json()
statement_id = statement_info['id']
statement_url = f"{statements_url}/{statement_id}"
print(f"✓ Statement submitted (ID: {statement_id})")
# Wait for statement to complete
print("Waiting for statement to complete...")
while True:
response = requests.get(statement_url, headers=headers)
statement_status = response.json()
state = statement_status.get('state', 'unknown')
if state == "available":
print(f"✓ Statement completed")
break
elif state in ["error", "cancelled"]:
raise Exception(f"Statement failed with state: {state}")
else:
print(f" Statement state: {state} - waiting...")
time.sleep(3)
# Extract and return results
output = statement_status.get('output', {})
return {
'status': output.get('status'),
'data': output.get('data', {}),
'execution_count': output.get('execution_count')
}
def delete_livy_session(session_url: str, headers: dict) -> None:
"""
Delete/close the Livy session to free up resources.
Args:
session_url: URL of the Livy session
headers: Request headers including authorization
"""
print("\n" + "="*60)
print("Cleaning up - Deleting Livy session...")
print("="*60)
response = requests.delete(session_url, headers=headers)
if response.status_code in [200, 204]:
print("✓ Session deleted successfully")
elif response.status_code == 404:
print("! Session was already deleted or not found")
else:
print(f"! Delete returned status code: {response.status_code}")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
def main():
"""Main function to demonstrate Livy API usage."""
print("="*60)
print("Fabric Livy API - Spark Session Demo")
print("="*60)
print(f"Workspace ID: {WORKSPACE_ID}")
print(f"Lakehouse ID: {LAKEHOUSE_ID}")
# Construct the Livy API URL
livy_base_url = (
f"{API_BASE_URL}/v1/workspaces/{WORKSPACE_ID}/lakehouses/{LAKEHOUSE_ID}/"
f"livyapi/versions/{LIVY_API_VERSION}/sessions"
)
print(f"\nLivy API URL: {livy_base_url}")
# Step 1: Authenticate using client secret
print("\n" + "="*60)
print("Step 1: Authentication (Client Credentials)")
print("="*60)
token = get_access_token_with_secret(TENANT_ID, CLIENT_ID, CLIENT_SECRET)
# Set up headers for API calls
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
session_url = None
try:
# Step 2: Create a Livy session
session_id, session_url = create_livy_session(livy_base_url, headers)
# Step 3: Wait for session to be ready
wait_for_session_idle(session_url, headers)
# Step 4: Execute a simple print statement
print("\n" + "="*60)
print("Step 4: Execute Simple Print Statement")
print("="*60)
# Simple print statement
print_code = '''
print("="*50)
print("Hello from Fabric Spark via Livy API!")
print("="*50)
print(f"Spark Version: {spark.version}")
print(f"App Name: {spark.sparkContext.appName}")
print("This code is running on Microsoft Fabric!")
print("="*50)
'''
result = execute_spark_statement(session_url, headers, print_code)
print("\n>>> OUTPUT <<<")
if 'text/plain' in result.get('data', {}):
print(result['data']['text/plain'])
else:
print(json.dumps(result, indent=2))
# Step 5: Execute another example - basic Spark operation
print("\n" + "="*60)
print("Step 5: Execute Basic Spark DataFrame Operation")
print("="*60)
spark_code = '''
# Create a simple DataFrame
data = [
("Alice", 34, "Data Engineer"),
("Bob", 45, "Data Scientist"),
("Charlie", 29, "ML Engineer"),
("Diana", 38, "Analytics Lead")
]
columns = ["Name", "Age", "Role"]
df = spark.createDataFrame(data, columns)
print("Sample DataFrame created:")
df.show()
print(f"Row count: {df.count()}")
'''
result = execute_spark_statement(session_url, headers, spark_code)
print("\n>>> OUTPUT <<<")
if 'text/plain' in result.get('data', {}):
print(result['data']['text/plain'])
else:
print(json.dumps(result, indent=2))
print("\n" + "="*60)
print("✓ All statements executed successfully!")
print("="*60)
except Exception as e:
print(f"\n✗ Error: {e}")
raise
finally:
# Step 6: Clean up - delete the session
if session_url:
delete_livy_session(session_url, headers)
print("\n" + "="*60)
print("Demo completed!")
print("="*60)
if __name__ == "__main__":
main()
PS C:\Git\Fabric-Livy-endpoint> python fabric_livy_sample.py
============================================================
Fabric Livy API - Spark Session Demo
============================================================
Workspace ID: f443e1d5-a88b-49f7-9284-bed9c225b3c8
Lakehouse ID: de6badf4-8ea2-4f0d-a635-abe1f1ad5b41
Livy API URL: https://api.fabric.microsoft.com/v1/workspaces/f443e1d5-a88b-49f7-9284-bed9c225b3c8/lakehouses/de6badf4-8ea2-4f0d-a635-abe1f1ad5b41/livyapi/versions/2023-12-01/sessions
============================================================
Step 1: Authentication (Client Credentials)
============================================================
C:\Users\jeffreylai\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\msal\application.py:204: UserWarning: Please upgrade msal-extensions. Only msal-extensions 1.2+ can work with msal 1.30+
warnings.warn(
Acquiring token using client credentials...
✓ Successfully acquired access token
============================================================
Creating Livy Spark session...
============================================================
✓ Session created successfully
Session ID: 16cb1d7d-b827-47f6-9e47-a6228ed87e7e
Session State: starting
Waiting for session to become idle...
Session state: starting - waiting...
✓ Session is now idle and ready
============================================================
Step 4: Execute Simple Print Statement
============================================================
------------------------------------------------------------
Executing Spark statement...
------------------------------------------------------------
Code:
print("="*50)
print("Hello from Fabric Spark via Livy API!")
print("="*50)
print(f"Spark Version: {...
✓ Statement submitted (ID: 1)
Waiting for statement to complete...
✓ Statement completed
>>> OUTPUT <<<
==================================================
Hello from Fabric Spark via Livy API!
==================================================
Spark Version: 3.5.5.5.4.20251103.2
App Name: sparksession
This code is running on Microsoft Fabric!
==================================================
============================================================
Step 5: Execute Basic Spark DataFrame Operation
============================================================
------------------------------------------------------------
Executing Spark statement...
------------------------------------------------------------
Code:
# Create a simple DataFrame
data = [
("Alice", 34, "Data Engineer"),
("Bob", 45, "Data Scie...
✓ Statement submitted (ID: 2)
Waiting for statement to complete...
Statement state: running - waiting...
✓ Statement completed
>>> OUTPUT <<<
Sample DataFrame created:
+-------+---+--------------+
| Name|Age| Role|
+-------+---+--------------+
| Alice| 34| Data Engineer|
| Bob| 45|Data Scientist|
|Charlie| 29| ML Engineer|
| Diana| 38|Analytics Lead|
+-------+---+--------------+
Row count: 4
============================================================
✓ All statements executed successfully!
============================================================
============================================================
Cleaning up - Deleting Livy session...
============================================================
✓ Session deleted successfully
============================================================
Demo completed!
============================================================
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment