Skip to content

Instantly share code, notes, and snippets.

@yu-iskw
Last active July 18, 2017 14:36
Show Gist options
  • Save yu-iskw/2e372adc0a6feb680c32510a5c188819 to your computer and use it in GitHub Desktop.
Save yu-iskw/2e372adc0a6feb680c32510a5c188819 to your computer and use it in GitHub Desktop.
Apache Airflow + Slack でデータクオリティチェックを自動化する ref: http://qiita.com/yu-iskw/items/dc0561c15729bb390f9f
{"timestamp":1500233640,"user_id":1234,"event_id":"view",...}
{"timestamp":1500233641,"user_id":4321,"event_id":"post",...}
dag = DAG(
'bq_event_log_checker',
default_args=default_args,
schedule_interval='@daily')
# event_id が NULL になっているレコード数をカウント
# 結果は 0 であることを期待する
expected = 0
sql = """
SELECT COUNT(*) AS event_id_null_count
FROM event_log.event_log_{{ yesterday_ds_nodash }}
WHERE JSON_EXTRACT(event_id) IS NULL
"""
checker = BigQueryValueCheckOperator(
dag=dag,
task_id='bq_checker',
bigquery_conn_id='bq_connection_id',
sql=sql,
pass_value=expected,
)
# BigQuery の結果が期待した値ではなかったとき Slack にメッセージを送る
slack = SlackAPIPostOperator(
dag=dag,
task_id='post_error_message_to_slack',
token=YOUR_SLACK_TOKEN,
channel='#data-quality',
username='airflow',
text='event_log on {{ yesterday_ds_nodash }} has record(s) whose event_id is null.',
trigger_rule=TriggerRule.ALL_FAILED
)
# タスクの依存関係を設定
checker.set_downstream(slack)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment