Skip to content

Instantly share code, notes, and snippets.

@momota10
Last active October 1, 2020 02:29
Show Gist options
  • Save momota10/f473a351e0517a5f01a59832547543e6 to your computer and use it in GitHub Desktop.
Save momota10/f473a351e0517a5f01a59832547543e6 to your computer and use it in GitHub Desktop.
Airflowのタスクが成功もしくは失敗した時にslackに通知する実装のサンプル
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import os
import datetime
import slackweb
WEBHOOK_URL = os.environ["WEBHOOK_URL"]
AIRFLOW_URL = os.environ["AIRFLOW_URL"]
def notify_failure(context):
slack = slackweb.Slack(url=WEBHOOK_URL)
attachments = [
{
"color": "#ff0000",
"title": f"{context['task']} is failed",
"title_link": AIRFLOW_URL,
"text": "<!channel>"
}
]
slack.notify(attachments=attachments)
def notify_success(context):
slack = slackweb.Slack(url=WEBHOOK_URL)
attachments = [
{
"color": "#008000",
"title": f"{context['task']} is success",
"title_link": AIRFLOW_URL,
"text": ""
}
]
slack.notify(attachments=attachments)
def hello_python(**kwargs):
print('Hello Python')
default_args = {
"start_date": datetime.datetime(2018, 1, 1),
"on_success_callback": notify_success,
"on_failure_callback": notify_failure,
}
with DAG(
"sample_dag",
schedule_interval=None,
catchup=False,
default_args=default_args) as dag:
task_bash = BashOperator(
task_id="task_bash",
bash_command="ll"
)
task_python = PythonOperator(
task_id="task_python",
python_callable=hello_python, # 実行するpythonの関数
provide_context=True # トリガー時に引数を渡す場合はTrueにする
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment