Last active
September 9, 2021 18:21
-
-
Save Dminor7/d023c5f996ef8b70c444912b125c7f60 to your computer and use it in GitHub Desktop.
Airflow dag for failed tasks slack notification.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
import uuid | |
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook | |
from airflow.operators.python_operator import PythonOperator | |
from airflow.utils.state import State | |
from airflow import DAG | |
def fail(): | |
raise Exception("Task failed intentionally for testing purpose") | |
def success(): | |
print("success") | |
def task_fail_slack_alert(context): | |
tis_dagrun = context['ti'].get_dagrun().get_task_instances() | |
failed_tasks = [] | |
for ti in tis_dagrun: | |
if ti.state == State.FAILED: | |
failed_tasks.append(f"<{ti.log_url}|{ti.task_id}>") | |
dag=context.get('task_instance').dag_id | |
exec_date=context.get('execution_date') | |
blocks = [ | |
{ | |
"type": "section", | |
"text": { | |
"type": "mrkdwn", | |
"text": ":red_circle: Dag Failed." | |
} | |
}, | |
{ | |
"type": "section", | |
"block_id": f"section{uuid.uuid4()}", | |
"text": { | |
"type": "mrkdwn", | |
"text": f"*Dag*: {dag} \n *Execution Time*: {exec_date}" | |
}, | |
"accessory": { | |
"type": "image", | |
"image_url": "https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png", | |
"alt_text": "Airflow" | |
} | |
}, | |
{ | |
"type": "section", | |
"text": { | |
"type": "mrkdwn", | |
"text": f"Failed Tasks: {', '.join(failed_tasks)}" | |
} | |
} | |
] | |
failed_alert = SlackWebhookHook( | |
http_conn_id='slack-airflow', | |
channel="#airflow-notifications", | |
blocks=blocks, | |
username='airflow' | |
) | |
failed_alert.execute() | |
return | |
default_args = { | |
'owner': 'airflow' | |
} | |
with DAG( | |
dag_id="slack-test", | |
default_args=default_args, | |
start_date=datetime(2021,8,19), | |
schedule_interval=None, | |
on_failure_callback=task_fail_slack_alert # If multiple task fails, message will be send only once with all failed tasks. | |
# If want to send message multiple times pass the callback in default_args | |
) as dag: | |
task_1 = PythonOperator( | |
task_id="slack_notification_test", | |
python_callable=fail | |
) | |
task_2 = PythonOperator( | |
task_id="slack_notification_test2", | |
python_callable=success | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment