Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save anna-geller/8cdf6ed2e4f8f6fec2b0d0d3a46f3cd8 to your computer and use it in GitHub Desktop.
Save anna-geller/8cdf6ed2e4f8f6fec2b0d0d3a46f3cd8 to your computer and use it in GitHub Desktop.
import asyncio
import textwrap
from client import from_dict
from dotenv import load_dotenv
import os
load_dotenv()
airflow_msg = textwrap.dedent(
"""
The following DAG finished execution: `{{ event.payload.dag }}` with final state `{{ event.payload.final_state }}` \n
DAG run ID: `{{ event.payload.dag_run_id }}` \n
Task: `{{ event.payload.task }}` \n
Event received at: `{{ event.received }}` \n
Airflow DAG run URL: <{{ event.payload.log_url }}|link to the Airflow UI>
Mark-success URL: <{{ event.payload.mark_success_url }}|link to mark the DAG run as successful>
"""
)
airflow_dag = {
"name": "Airflow DAG Runs",
"description": "Send a message to Slack when a DAG run succeeds or fails.",
"trigger": {
"expect": [
"airflow.dag", # this must match with the event name
],
"match_related": {
"prefect.resource.id": "airflow.dag.*",
"prefect.resource.role": "airflow",
},
"posture": "Reactive",
"threshold": 1,
"within": 10,
},
"actions": [
{
"type": "send-notification",
"block_document_id": os.environ.get("BLOCK_DOCUMENT_ID"),
"subject": "Airflow DAG run event",
"body": airflow_msg,
}
],
}
if __name__ == "__main__":
asyncio.run(from_dict(airflow_dag))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment