Last active
July 18, 2017 14:36
-
-
Save yu-iskw/2e372adc0a6feb680c32510a5c188819 to your computer and use it in GitHub Desktop.
Apache Airflow + Slack でデータクオリティチェックを自動化する ref: http://qiita.com/yu-iskw/items/dc0561c15729bb390f9f
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"timestamp":1500233640,"user_id":1234,"event_id":"view",...} | |
{"timestamp":1500233641,"user_id":4321,"event_id":"post",...} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dag = DAG( | |
'bq_event_log_checker', | |
default_args=default_args, | |
schedule_interval='@daily') | |
# event_id が NULL になっているレコード数をカウント | |
# 結果は 0 であることを期待する | |
expected = 0 | |
sql = """ | |
SELECT COUNT(*) AS event_id_null_count | |
FROM event_log.event_log_{{ yesterday_ds_nodash }} | |
WHERE JSON_EXTRACT(event_id) IS NULL | |
""" | |
checker = BigQueryValueCheckOperator( | |
dag=dag, | |
task_id='bq_checker', | |
bigquery_conn_id='bq_connection_id', | |
sql=sql, | |
pass_value=expected, | |
) | |
# BigQuery の結果が期待した値ではなかったとき Slack にメッセージを送る | |
slack = SlackAPIPostOperator( | |
dag=dag, | |
task_id='post_error_message_to_slack', | |
token=YOUR_SLACK_TOKEN, | |
channel='#data-quality', | |
username='airflow', | |
text='event_log on {{ yesterday_ds_nodash }} has record(s) whose event_id is null.', | |
trigger_rule=TriggerRule.ALL_FAILED | |
) | |
# タスクの依存関係を設定 | |
checker.set_downstream(slack) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment