Last active
October 1, 2020 02:29
-
-
Save momota10/f473a351e0517a5f01a59832547543e6 to your computer and use it in GitHub Desktop.
Airflowのタスクが成功もしくは失敗した時にslackに通知する実装のサンプル
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow.operators.bash_operator import BashOperator | |
from airflow.operators.python_operator import PythonOperator | |
import os | |
import datetime | |
import slackweb | |
WEBHOOK_URL = os.environ["WEBHOOK_URL"] | |
AIRFLOW_URL = os.environ["AIRFLOW_URL"] | |
def notify_failure(context): | |
slack = slackweb.Slack(url=WEBHOOK_URL) | |
attachments = [ | |
{ | |
"color": "#ff0000", | |
"title": f"{context['task']} is failed", | |
"title_link": AIRFLOW_URL, | |
"text": "<!channel>" | |
} | |
] | |
slack.notify(attachments=attachments) | |
def notify_success(context): | |
slack = slackweb.Slack(url=WEBHOOK_URL) | |
attachments = [ | |
{ | |
"color": "#008000", | |
"title": f"{context['task']} is success", | |
"title_link": AIRFLOW_URL, | |
"text": "" | |
} | |
] | |
slack.notify(attachments=attachments) | |
def hello_python(**kwargs): | |
print('Hello Python') | |
default_args = { | |
"start_date": datetime.datetime(2018, 1, 1), | |
"on_success_callback": notify_success, | |
"on_failure_callback": notify_failure, | |
} | |
with DAG( | |
"sample_dag", | |
schedule_interval=None, | |
catchup=False, | |
default_args=default_args) as dag: | |
task_bash = BashOperator( | |
task_id="task_bash", | |
bash_command="ll" | |
) | |
task_python = PythonOperator( | |
task_id="task_python", | |
python_callable=hello_python, # 実行するpythonの関数 | |
provide_context=True # トリガー時に引数を渡す場合はTrueにする | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment