Created
June 10, 2021 08:42
-
-
Save lordlinus/21c53f2fad9c63fe29c328599383f992 to your computer and use it in GitHub Desktop.
synapse spark job submit via REST api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import logging | |
from subprocess import run, PIPE | |
import requests | |
import json | |
import time | |
from datetime import datetime | |
| |
now_file = f"{datetime.utcnow().strftime('%X').replace(':','-')}.log" | |
print(f"Log Filename: {now_file}") | |
# logging.basicConfig(filename=now_file,format="%(asctime)s:%(levelname)s:%(message)s") | |
| |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=[ | |
logging.FileHandler(now_file), | |
logging.StreamHandler() | |
] | |
) | |
# logging.info("This should be logged") | |
| |
synapse_wksp = "synapse-wksp-01" | |
spark_pool = "sparkpool01" | |
synapse_wksp_stg = "synapsestorage" | |
synapse_wksp_container = "containerstorage" | |
batch_job_dir = f"abfss://{synapse_wksp_container}@{synapse_wksp_stg}.dfs.core.windows.net/synapse/workspaces/{synapse_wksp}/batchjobs/SparkJobDef2" | |
synapse_endpoint = f"https://{synapse_wksp}.dev.azuresynapse.net" | |
spark_batch_job_url = f"{synapse_endpoint}/livyApi/versions/2019-11-01-preview/sparkPools/{spark_pool}/batches" | |
| |
| |
workload_1 = {"wl1": ["step01.py", "step02.py", "step03.py", "step04.py", "step05.py", "step06.py", "summary.py"]} | |
workload_2 = { "wl2": ["step01.py", "step02.py", "step03.py", "step04.py", "step05.py", "step06.py", "summary.py"]} | |
workload_3 = {"wl3": ["step01.py", "step02.py", "step03.py", "step04.py", "step05.py", "step06.py", "summary.py"]} | |
workload_4 = {"wl4": ["step01.py", "step02.py", "step03.py", "step04.py", "step05.py", "step06.py", "summary.py"]} | |
| |
wls = [workload_1,workload_4,workload_2,workload_3] | |
| |
# Get bearer token | |
def get_token(): | |
az_command = ['az', 'account', 'get-access-token', '--resource', | |
'https://dev.azuresynapse.net', '--query', 'accessToken'] | |
result = run(az_command, stdout=PIPE, stderr=PIPE, universal_newlines=True) | |
bearer_token = result.stdout.strip().strip('""') | |
auth_header = {"Authorization": f"Bearer {bearer_token}"} | |
return auth_header | |
| |
| |
auth_header = get_token() | |
| |
wl_3_job_4_conf_01 = { | |
"spark.dynamicAllocation.enabled": "true", | |
"spark.dynamicAllocation.minExecutors": "32", | |
"spark.dynamicAllocation.maxExecutors": "186", | |
"spark.dynamicAllocation.executorIdleTimeout": "1800s", | |
"spark.sql.shuffle.partitions":"15000", | |
"spark.task.cpus":"2" | |
} | |
| |
wl_3_job_5_conf_01 = { | |
"spark.dynamicAllocation.enabled": "true", | |
"spark.dynamicAllocation.minExecutors": "32", | |
"spark.dynamicAllocation.maxExecutors": "186", | |
"spark.dynamicAllocation.executorIdleTimeout": "1800s", | |
"spark.sql.shuffle.partitions":"5000", | |
"spark.task.cpus":"2", | |
"spark.sql.adaptive.enabled": "true", | |
"spark.sql.adaptive.skewJoin.enabled": "true" | |
} | |
| |
default_conf = { | |
"spark.dynamicAllocation.enabled": "true", | |
"spark.dynamicAllocation.minExecutors": "32", | |
"spark.dynamicAllocation.maxExecutors": "186", | |
"spark.dynamicAllocation.executorIdleTimeout": "1800s", | |
"spark.sql.shuffle.partitions":"500" | |
} | |
| |
payload = { | |
"properties": { | |
"targetBigDataPool": { | |
"referenceName": spark_pool, | |
"type": "BigDataPoolReference" | |
}, | |
"requiredSparkVersion": "3.0", | |
"language": "python", | |
"jobProperties": { | |
"name": None, | |
"file": None, | |
"conf": None, | |
"numExecutors": 32, | |
"executorCores": 16, | |
"executorMemory": "112g", | |
"driverCores": 16, | |
"driverMemory": "112g" | |
} | |
} | |
} | |
| |
def create_spark_job(app_name,stg_file_path,conf): | |
spark_job_create_url = f"{synapse_endpoint}/sparkJobDefinitions/{app_name}" | |
payload['properties']['jobProperties']['name'] = app_name | |
payload['properties']['jobProperties']['file'] = stg_file_path | |
payload['properties']['jobProperties']['conf'] = conf | |
# Create job should be done once before executing the the jobs so may not need auth tokens | |
# renewed very frequently | |
# auth_header = get_token() | |
# r = requests.delete(spark_job_create_url,params={"api-version":"2019-06-01-preview"},headers=auth_header) # Delete the job to ensure there is no old config | |
# print(r.json()) | |
time.sleep(1) | |
r = requests.put(spark_job_create_url,params={"api-version":"2019-06-01-preview"},json=payload,headers=auth_header) | |
response = r.json() | |
logging.info(f"submit_response_status: {r.status_code}\nsubmit_response: {r.json()}") | |
| |
def execute_spark_job(app_name): | |
while True: | |
try: | |
auth_header = get_token() | |
spark_job_submit_url = f"{synapse_endpoint}/sparkJobDefinitions/{app_name}/execute" | |
submit_response = requests.post(spark_job_submit_url,params={"api-version":"2019-06-01-preview"},headers=auth_header) | |
if (submit_response.status_code == 202): | |
livy_id = submit_response.json()['id'] | |
return livy_id | |
break | |
else: | |
logging.error(f"Error submitting job: {submit_response.json()}") | |
time.sleep(2) | |
except: | |
pass | |
| |
# create Spark Batch Job | |
for wl in wls: | |
logging.info(f"{wl} - Start time: {datetime.utcnow()}") | |
for dir,files_list in wl.items(): | |
for file_name in files_list: | |
logging.info(f"File_name: {file_name}") | |
if (dir == "wl3" and file_name == 'step04.py'): | |
conf = wl_3_job_4_conf_01 | |
elif (dir == "wl3" and file_name == 'step05.py'): | |
conf = wl_3_job_5_conf_01 | |
else: | |
conf = default_conf | |
app_name = file_name.split('.')[0]+"_dynamic" # Use static for 2000 cores | |
stg_file_path = f"{batch_job_dir}/{dir}/{file_name}" # construct the PySpark file to submit | |
# create_spark_job(app_name,stg_file_path,conf) # Create the Spark Job Definition first | |
livy_id = execute_spark_job(app_name) # This job def should already exist | |
logging.info(f"Livy Id:{livy_id}") | |
while True: | |
try: | |
spark_batch_job_url_status = f"{spark_batch_job_url}/{livy_id}" | |
auth_header = get_token() | |
r = requests.get(spark_batch_job_url_status, params={"detailed":True}, headers=auth_header) | |
r_json = r.json() | |
state,result = r_json["state"],r_json["result"] | |
if (state == "success" and result == "Succeeded"): | |
appId = r_json['appId'] | |
appName = r_json['livyInfo']['jobCreationRequest']['name'] | |
submittedAt,scheduledAt,endedAt = r_json['schedulerInfo']['submittedAt'],r_json['schedulerInfo']['scheduledAt'],r_json['schedulerInfo']['endedAt'] | |
app_result = f"livy_id:{livy_id}\tappId:{appId}\tsubmittedAt:{submittedAt}\tscheduledAt={scheduledAt}\tendedAt:{endedAt}\n" | |
logging.info(app_result) | |
break; | |
except Exception as e: | |
logging.error(f"Exception while monitoring Job: {e}") | |
logging.info(f"{wl} - End time: {datetime.utcnow()}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment