Last active
December 9, 2025 09:09
-
-
Save 7effrey89/07e620053664ba76a4aea73bcfe984c6 to your computer and use it in GitHub Desktop.
Demonstrate how to connect to the Livy Endpoint in Microsoft Fabric
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Documentation: https://learn.microsoft.com/en-us/fabric/data-engineering/get-started-api-livy | |
| Fabric Livy API Sample Script | |
| ============================== | |
| This script demonstrates how to use the Livy API to submit and execute | |
| Spark session jobs in Microsoft Fabric. | |
| Prerequisites: | |
| - Fabric Premium or Trial capacity with a Lakehouse | |
| - Enable the Tenant Admin Setting for Livy API (preview) | |
| - Microsoft Entra app with appropriate permissions | |
| - Install required packages: pip install msal requests | |
| Required Entra App API Permissions: | |
| - Lakehouse.Execute.All | |
| - Lakehouse.Read.All | |
| - Item.ReadWrite.All | |
| - Workspace.ReadWrite.All | |
| - Code.AccessStorage.All | |
| - Code.AccessFabric.All | |
| Fabric Workspace: | |
| - Deployed Lakehouse | |
| - The app registration (SPN) need contributor on workspace | |
| """ | |
| import json | |
| import time | |
| import requests | |
| from msal import ConfidentialClientApplication | |
| # ============================================================================= | |
| # CONFIGURATION - Your Fabric and Entra App settings | |
| # ============================================================================= | |
| # Microsoft Entra (Azure AD) Configuration | |
| TENANT_ID = "f" # Your Microsoft Entra tenant ID - update this! | |
| CLIENT_ID = "" | |
| CLIENT_SECRET = "B" | |
| # Fabric Workspace and Lakehouse IDs | |
| WORKSPACE_ID = "" | |
| LAKEHOUSE_ID = "" | |
| # API Configuration | |
| API_BASE_URL = "https://api.fabric.microsoft.com" | |
| LIVY_API_VERSION = "2024-07-30" | |
| # Scope for client credentials flow (Service Principal) | |
| SCOPES = ["https://api.fabric.microsoft.com/.default"] | |
| # ============================================================================= | |
| # AUTHENTICATION | |
| # ============================================================================= | |
| def get_access_token_with_secret(tenant_id: str, client_id: str, client_secret: str) -> str: | |
| """ | |
| Get an access token using client credentials (client secret). | |
| This uses the Service Principal (SPN) flow - no user interaction needed. | |
| Args: | |
| tenant_id: Microsoft Entra tenant ID | |
| client_id: Application (client) ID | |
| client_secret: Client secret value | |
| Returns: | |
| Access token string | |
| """ | |
| from msal import ConfidentialClientApplication | |
| authority = f"https://login.microsoftonline.com/{tenant_id}" | |
| app = ConfidentialClientApplication( | |
| client_id, | |
| authority=authority, | |
| client_credential=client_secret | |
| ) | |
| print("Acquiring token using client credentials...") | |
| token_response = app.acquire_token_for_client(scopes=SCOPES) | |
| if "access_token" in token_response: | |
| print("✓ Successfully acquired access token") | |
| return token_response["access_token"] | |
| else: | |
| error_msg = token_response.get('error_description', 'Unknown error') | |
| raise Exception(f"Authentication failed: {error_msg}") | |
| # ============================================================================= | |
| # LIVY SESSION MANAGEMENT | |
| # ============================================================================= | |
| def create_livy_session(base_url: str, headers: dict) -> tuple: | |
| """ | |
| Create a new Livy Spark session. | |
| Args: | |
| base_url: The Livy API base URL for sessions | |
| headers: Request headers including authorization | |
| Returns: | |
| Tuple of (session_id, session_url) | |
| """ | |
| print("\n" + "="*60) | |
| print("Creating Livy Spark session...") | |
| print("="*60) | |
| response = requests.post(base_url, headers=headers, json={}) | |
| # 200 OK or 202 Accepted are both valid success responses | |
| if response.status_code in [200, 202]: | |
| session_info = response.json() | |
| session_id = session_info['id'] | |
| session_url = f"{base_url}/{session_id}" | |
| print(f"✓ Session created successfully") | |
| print(f" Session ID: {session_id}") | |
| print(f" Session State: {session_info.get('state', 'starting')}") | |
| return session_id, session_url | |
| else: | |
| raise Exception(f"Failed to create session: {response.status_code} - {response.text}") | |
| def wait_for_session_idle(session_url: str, headers: dict, timeout_seconds: int = 300) -> None: | |
| """ | |
| Wait for the Livy session to become idle (ready to accept statements). | |
| Args: | |
| session_url: URL of the Livy session | |
| headers: Request headers including authorization | |
| timeout_seconds: Maximum time to wait | |
| """ | |
| print("\nWaiting for session to become idle...") | |
| start_time = time.time() | |
| while True: | |
| response = requests.get(session_url, headers=headers) | |
| session_status = response.json() | |
| state = session_status.get('state', 'unknown') | |
| if state == "idle": | |
| print(f"✓ Session is now idle and ready") | |
| return | |
| elif state in ["dead", "error", "killed"]: | |
| raise Exception(f"Session entered terminal state: {state}") | |
| elif time.time() - start_time > timeout_seconds: | |
| raise Exception(f"Timeout waiting for session to become idle") | |
| else: | |
| print(f" Session state: {state} - waiting...") | |
| time.sleep(5) | |
| def execute_spark_statement(session_url: str, headers: dict, code: str, kind: str = "pyspark") -> dict: | |
| """ | |
| Execute a Spark statement in the Livy session. | |
| Args: | |
| session_url: URL of the Livy session | |
| headers: Request headers including authorization | |
| code: The Spark/PySpark code to execute | |
| kind: The type of code (pyspark, spark, sql) | |
| Returns: | |
| Statement result dictionary | |
| """ | |
| print("\n" + "-"*60) | |
| print("Executing Spark statement...") | |
| print("-"*60) | |
| print(f"Code: {code[:100]}{'...' if len(code) > 100 else ''}") | |
| statements_url = f"{session_url}/statements" | |
| payload = { | |
| "code": code, | |
| "kind": kind | |
| } | |
| # Submit the statement | |
| response = requests.post(statements_url, headers=headers, json=payload) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to submit statement: {response.status_code} - {response.text}") | |
| statement_info = response.json() | |
| statement_id = statement_info['id'] | |
| statement_url = f"{statements_url}/{statement_id}" | |
| print(f"✓ Statement submitted (ID: {statement_id})") | |
| # Wait for statement to complete | |
| print("Waiting for statement to complete...") | |
| while True: | |
| response = requests.get(statement_url, headers=headers) | |
| statement_status = response.json() | |
| state = statement_status.get('state', 'unknown') | |
| if state == "available": | |
| print(f"✓ Statement completed") | |
| break | |
| elif state in ["error", "cancelled"]: | |
| raise Exception(f"Statement failed with state: {state}") | |
| else: | |
| print(f" Statement state: {state} - waiting...") | |
| time.sleep(3) | |
| # Extract and return results | |
| output = statement_status.get('output', {}) | |
| return { | |
| 'status': output.get('status'), | |
| 'data': output.get('data', {}), | |
| 'execution_count': output.get('execution_count') | |
| } | |
| def delete_livy_session(session_url: str, headers: dict) -> None: | |
| """ | |
| Delete/close the Livy session to free up resources. | |
| Args: | |
| session_url: URL of the Livy session | |
| headers: Request headers including authorization | |
| """ | |
| print("\n" + "="*60) | |
| print("Cleaning up - Deleting Livy session...") | |
| print("="*60) | |
| response = requests.delete(session_url, headers=headers) | |
| if response.status_code in [200, 204]: | |
| print("✓ Session deleted successfully") | |
| elif response.status_code == 404: | |
| print("! Session was already deleted or not found") | |
| else: | |
| print(f"! Delete returned status code: {response.status_code}") | |
| # ============================================================================= | |
| # MAIN EXECUTION | |
| # ============================================================================= | |
| def main(): | |
| """Main function to demonstrate Livy API usage.""" | |
| print("="*60) | |
| print("Fabric Livy API - Spark Session Demo") | |
| print("="*60) | |
| print(f"Workspace ID: {WORKSPACE_ID}") | |
| print(f"Lakehouse ID: {LAKEHOUSE_ID}") | |
| # Construct the Livy API URL | |
| livy_base_url = ( | |
| f"{API_BASE_URL}/v1/workspaces/{WORKSPACE_ID}/lakehouses/{LAKEHOUSE_ID}/" | |
| f"livyapi/versions/{LIVY_API_VERSION}/sessions" | |
| ) | |
| print(f"\nLivy API URL: {livy_base_url}") | |
| # Step 1: Authenticate using client secret | |
| print("\n" + "="*60) | |
| print("Step 1: Authentication (Client Credentials)") | |
| print("="*60) | |
| token = get_access_token_with_secret(TENANT_ID, CLIENT_ID, CLIENT_SECRET) | |
| # Set up headers for API calls | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json" | |
| } | |
| session_url = None | |
| try: | |
| # Step 2: Create a Livy session | |
| session_id, session_url = create_livy_session(livy_base_url, headers) | |
| # Step 3: Wait for session to be ready | |
| wait_for_session_idle(session_url, headers) | |
| # Step 4: Execute a simple print statement | |
| print("\n" + "="*60) | |
| print("Step 4: Execute Simple Print Statement") | |
| print("="*60) | |
| # Simple print statement | |
| print_code = ''' | |
| print("="*50) | |
| print("Hello from Fabric Spark via Livy API!") | |
| print("="*50) | |
| print(f"Spark Version: {spark.version}") | |
| print(f"App Name: {spark.sparkContext.appName}") | |
| print("This code is running on Microsoft Fabric!") | |
| print("="*50) | |
| ''' | |
| result = execute_spark_statement(session_url, headers, print_code) | |
| print("\n>>> OUTPUT <<<") | |
| if 'text/plain' in result.get('data', {}): | |
| print(result['data']['text/plain']) | |
| else: | |
| print(json.dumps(result, indent=2)) | |
| # Step 5: Execute another example - basic Spark operation | |
| print("\n" + "="*60) | |
| print("Step 5: Execute Basic Spark DataFrame Operation") | |
| print("="*60) | |
| spark_code = ''' | |
| # Create a simple DataFrame | |
| data = [ | |
| ("Alice", 34, "Data Engineer"), | |
| ("Bob", 45, "Data Scientist"), | |
| ("Charlie", 29, "ML Engineer"), | |
| ("Diana", 38, "Analytics Lead") | |
| ] | |
| columns = ["Name", "Age", "Role"] | |
| df = spark.createDataFrame(data, columns) | |
| print("Sample DataFrame created:") | |
| df.show() | |
| print(f"Row count: {df.count()}") | |
| ''' | |
| result = execute_spark_statement(session_url, headers, spark_code) | |
| print("\n>>> OUTPUT <<<") | |
| if 'text/plain' in result.get('data', {}): | |
| print(result['data']['text/plain']) | |
| else: | |
| print(json.dumps(result, indent=2)) | |
| print("\n" + "="*60) | |
| print("✓ All statements executed successfully!") | |
| print("="*60) | |
| except Exception as e: | |
| print(f"\n✗ Error: {e}") | |
| raise | |
| finally: | |
| # Step 6: Clean up - delete the session | |
| if session_url: | |
| delete_livy_session(session_url, headers) | |
| print("\n" + "="*60) | |
| print("Demo completed!") | |
| print("="*60) | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| PS C:\Git\Fabric-Livy-endpoint> python fabric_livy_sample.py | |
| ============================================================ | |
| Fabric Livy API - Spark Session Demo | |
| ============================================================ | |
| Workspace ID: f443e1d5-a88b-49f7-9284-bed9c225b3c8 | |
| Lakehouse ID: de6badf4-8ea2-4f0d-a635-abe1f1ad5b41 | |
| Livy API URL: https://api.fabric.microsoft.com/v1/workspaces/f443e1d5-a88b-49f7-9284-bed9c225b3c8/lakehouses/de6badf4-8ea2-4f0d-a635-abe1f1ad5b41/livyapi/versions/2023-12-01/sessions | |
| ============================================================ | |
| Step 1: Authentication (Client Credentials) | |
| ============================================================ | |
| C:\Users\jeffreylai\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\msal\application.py:204: UserWarning: Please upgrade msal-extensions. Only msal-extensions 1.2+ can work with msal 1.30+ | |
| warnings.warn( | |
| Acquiring token using client credentials... | |
| ✓ Successfully acquired access token | |
| ============================================================ | |
| Creating Livy Spark session... | |
| ============================================================ | |
| ✓ Session created successfully | |
| Session ID: 16cb1d7d-b827-47f6-9e47-a6228ed87e7e | |
| Session State: starting | |
| Waiting for session to become idle... | |
| Session state: starting - waiting... | |
| ✓ Session is now idle and ready | |
| ============================================================ | |
| Step 4: Execute Simple Print Statement | |
| ============================================================ | |
| ------------------------------------------------------------ | |
| Executing Spark statement... | |
| ------------------------------------------------------------ | |
| Code: | |
| print("="*50) | |
| print("Hello from Fabric Spark via Livy API!") | |
| print("="*50) | |
| print(f"Spark Version: {... | |
| ✓ Statement submitted (ID: 1) | |
| Waiting for statement to complete... | |
| ✓ Statement completed | |
| >>> OUTPUT <<< | |
| ================================================== | |
| Hello from Fabric Spark via Livy API! | |
| ================================================== | |
| Spark Version: 3.5.5.5.4.20251103.2 | |
| App Name: sparksession | |
| This code is running on Microsoft Fabric! | |
| ================================================== | |
| ============================================================ | |
| Step 5: Execute Basic Spark DataFrame Operation | |
| ============================================================ | |
| ------------------------------------------------------------ | |
| Executing Spark statement... | |
| ------------------------------------------------------------ | |
| Code: | |
| # Create a simple DataFrame | |
| data = [ | |
| ("Alice", 34, "Data Engineer"), | |
| ("Bob", 45, "Data Scie... | |
| ✓ Statement submitted (ID: 2) | |
| Waiting for statement to complete... | |
| Statement state: running - waiting... | |
| ✓ Statement completed | |
| >>> OUTPUT <<< | |
| Sample DataFrame created: | |
| +-------+---+--------------+ | |
| | Name|Age| Role| | |
| +-------+---+--------------+ | |
| | Alice| 34| Data Engineer| | |
| | Bob| 45|Data Scientist| | |
| |Charlie| 29| ML Engineer| | |
| | Diana| 38|Analytics Lead| | |
| +-------+---+--------------+ | |
| Row count: 4 | |
| ============================================================ | |
| ✓ All statements executed successfully! | |
| ============================================================ | |
| ============================================================ | |
| Cleaning up - Deleting Livy session... | |
| ============================================================ | |
| ✓ Session deleted successfully | |
| ============================================================ | |
| Demo completed! | |
| ============================================================ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment