Skip to content

Instantly share code, notes, and snippets.

@Dminor7
Last active September 9, 2021 18:21
Show Gist options
  • Save Dminor7/d023c5f996ef8b70c444912b125c7f60 to your computer and use it in GitHub Desktop.
Save Dminor7/d023c5f996ef8b70c444912b125c7f60 to your computer and use it in GitHub Desktop.
Airflow dag for failed tasks slack notification.
from datetime import datetime
import uuid
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.state import State
from airflow import DAG
def fail():
raise Exception("Task failed intentionally for testing purpose")
def success():
print("success")
def task_fail_slack_alert(context):
tis_dagrun = context['ti'].get_dagrun().get_task_instances()
failed_tasks = []
for ti in tis_dagrun:
if ti.state == State.FAILED:
failed_tasks.append(f"<{ti.log_url}|{ti.task_id}>")
dag=context.get('task_instance').dag_id
exec_date=context.get('execution_date')
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": ":red_circle: Dag Failed."
}
},
{
"type": "section",
"block_id": f"section{uuid.uuid4()}",
"text": {
"type": "mrkdwn",
"text": f"*Dag*: {dag} \n *Execution Time*: {exec_date}"
},
"accessory": {
"type": "image",
"image_url": "https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png",
"alt_text": "Airflow"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"Failed Tasks: {', '.join(failed_tasks)}"
}
}
]
failed_alert = SlackWebhookHook(
http_conn_id='slack-airflow',
channel="#airflow-notifications",
blocks=blocks,
username='airflow'
)
failed_alert.execute()
return
default_args = {
'owner': 'airflow'
}
with DAG(
dag_id="slack-test",
default_args=default_args,
start_date=datetime(2021,8,19),
schedule_interval=None,
on_failure_callback=task_fail_slack_alert # If multiple task fails, message will be send only once with all failed tasks.
# If want to send message multiple times pass the callback in default_args
) as dag:
task_1 = PythonOperator(
task_id="slack_notification_test",
python_callable=fail
)
task_2 = PythonOperator(
task_id="slack_notification_test2",
python_callable=success
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment