Skip to content

Instantly share code, notes, and snippets.

@dylanberry
Last active May 7, 2024 12:56
Show Gist options
  • Save dylanberry/fce1548a6c8f96ed8e1916dca5376a62 to your computer and use it in GitHub Desktop.
Save dylanberry/fce1548a6c8f96ed8e1916dca5376a62 to your computer and use it in GitHub Desktop.
Call databricks job from Azure Datafactory and wait for the job to complete.
{
"name": "util-run-databricks-job",
"properties": {
"description": "Databricks Application ID: 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d",
"activities": [
{
"name": "Execute Jobs API",
"type": "WebActivity",
"dependsOn": [
{
"activity": "Set JobID",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"url": {
"value": "@concat('https://',pipeline().globalParameters.databricksWorkspaceID,'.azuredatabricks.net/api/2.1/jobs/run-now')",
"type": "Expression"
},
"connectVia": {
"referenceName": "SelfHostedIntegrationRuntime",
"type": "IntegrationRuntimeReference"
},
"method": "POST",
"body": {
"value": "@if(empty(pipeline().parameters.Parameters), \nconcat('{\"job_id\":',variables('JobID'),'}'), \nconcat('{\"job_id\":',variables('JobID'),\n',\"notebook_params\":', pipeline().parameters.Parameters,'}')\n)",
"type": "Expression"
},
"authentication": {
"type": "MSI",
"resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d"
}
}
},
{
"name": "Wait Until Job Completes",
"type": "Until",
"dependsOn": [
{
"activity": "Execute Jobs API",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@not(equals(variables('JobStatus'),'Running'))",
"type": "Expression"
},
"activities": [
{
"name": "Check Job Run API",
"type": "WebActivity",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"url": {
"value": "@concat('https://',pipeline().globalParameters.databricksWorkspaceID,'.azuredatabricks.net/api/2.1/jobs/runs/get?run_id=',activity('Execute Jobs API').output.run_id)",
"type": "Expression"
},
"connectVia": {
"referenceName": "SelfHostedIntegrationRuntime",
"type": "IntegrationRuntimeReference"
},
"method": "GET",
"body": {
"job_id": 3895
},
"authentication": {
"type": "MSI",
"resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d"
}
}
},
{
"name": "Set Job Status",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Check Job Run API",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "JobStatus",
"value": {
"value": "@if(\n or(\n equals(activity('Check Job Run API').output.state.life_cycle_state, 'PENDING'),\n equals(activity('Check Job Run API').output.state.life_cycle_state, 'RUNNING')\n ), \n 'Running',\n if(\n equals(activity('Check Job Run API').output.state.life_cycle_state, 'SKIPPED'),\n 'Skipped',\n activity('Check Job Run API').output.state.result_state\n )\n)",
"type": "Expression"
}
}
},
{
"name": "Wait to Recheck API_copy1",
"type": "Wait",
"dependsOn": [
{
"activity": "Set Job Status",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"waitTimeInSeconds": {
"value": "@pipeline().parameters.WaitSeconds",
"type": "Expression"
}
}
}
],
"timeout": "7.00:00:00"
}
},
{
"name": "List Jobs",
"type": "WebActivity",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"url": {
"value": "@concat('https://',pipeline().globalParameters.databricksWorkspaceID,'.azuredatabricks.net/api/2.1/jobs/list')",
"type": "Expression"
},
"connectVia": {
"referenceName": "SelfHostedIntegrationRuntime",
"type": "IntegrationRuntimeReference"
},
"method": "GET",
"authentication": {
"type": "MSI",
"resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d"
}
}
},
{
"name": "Filter Jobs By Name",
"type": "Filter",
"dependsOn": [
{
"activity": "Set JobName",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('List Jobs').output.jobs",
"type": "Expression"
},
"condition": {
"value": "@equals(variables('JobName'), item().settings.name)",
"type": "Expression"
}
}
},
{
"name": "Set JobID",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Filter Jobs By Name",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "JobID",
"value": {
"value": "@string(first(activity('Filter Jobs By Name').output.Value).job_id)",
"type": "Expression"
}
}
},
{
"name": "Set JobName",
"type": "SetVariable",
"dependsOn": [
{
"activity": "List Jobs",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "JobName",
"value": {
"value": "@concat(pipeline().parameters.JobName, '_', pipeline().globalParameters.env)",
"type": "Expression"
}
}
},
{
"name": "Fail if not successful",
"type": "IfCondition",
"dependsOn": [
{
"activity": "Wait Until Job Completes",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@equals(variables('JobStatus'), 'SUCCESS')",
"type": "Expression"
},
"ifFalseActivities": [
{
"name": "Fail1",
"type": "Fail",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"message": "Job Failed",
"errorCode": "0"
}
}
]
}
}
],
"parameters": {
"JobName": {
"type": "string",
"defaultValue": "sample"
},
"WaitSeconds": {
"type": "int",
"defaultValue": 30
},
"Parameters": {
"type": "string"
}
},
"variables": {
"JobStatus": {
"type": "String",
"defaultValue": "Running"
},
"JobID": {
"type": "String"
},
"JobName": {
"type": "String"
}
},
"folder": {
"name": "common"
},
"annotations": [],
"lastPublishTime": "2021-10-26T17:26:15Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment