airflow

Apache Airflow + Slack でデータクオリティチェックを自動化する

More than 1 year has passed since last update.

データ分析をしている人にとっては,データのクオリティを日々モニタリングするのは頭痛の種です.最近のシリコンバレーでは,Data Engineer とは別に Data Quality Engineer というポジションで募集をしている企業もたまに見かけます.それぐらいデータの「質」には,気を配る必要がありリソースが掛かる分野といえるでしょう.とは言え,専任のポジションを設けるのはなかなか難しいので,それでもできるだけ簡単にモニターリングしたいです.

Airflow と Slack を組み合わせることで,データのクリティを日々モニタリングすることができます.データエンジニアリングにユニットテスト的な概念を持ち込めないかと模索してきましたが,Airflow + Slack は個人的には一つの理想形と言えます.


Apache Airflow とは?

Apache Airflow は,プログラムでジョブをスケジューリングしたりモニターするフレームワークです. Directed Acyclic Graph (DAG) としてタスクの依存関係を記述して,ワークフローを定義できます.


Airflow as Data Quality Checker

Airflow には,CheckOperator という「何か」をチェックするための Operator のサブクラスがあります. 例えば,BigQuery 向けの CheckOperator としては,つぎの2つが apache-airflow==1.8.2rc1 では実装されています.


  • BigQueryValueCheckOperator

  • BigQueryIntervalCheckOperator

BigQueryValueCheckOperator は BigQuery に投げたクエリの結果が期待した結果かどうかを判定できます. また BigQueryIntervalCheckOperator は,BigQuery に投げたクエリの結果が期待した範囲の値かどうかを判定できます.

こうした考え方は,ユニットテストに非常に近いと思います.ユニットテストであれば,失敗したテストがあったということでプログラムやシステムが期待した挙動になっていないことがわかります.ここでは結果が期待した値でなかったときに,Slack にメッセージを送ることで不具合を通知するようにしたいと思います.もちろんメールを送ったり,JIRA にチケットを作ったり別の方法で不具合を通知するのも良いでしょう.


サンプルコード

例えば,日々つぎのようなイベントログをJSON形式で収集しているとします.このイベントログでは,event_id のキーが必ず入っていることを期待するとします.

{"timestamp":1500233640,"user_id":1234,"event_id":"view",...}

{"timestamp":1500233641,"user_id":4321,"event_id":"post",...}

ここでは前日分のログの event_id が NULL になっているレコード数をカウントして,それが 0 でなければイベントログになんらかの不具合があると考えられます.そのようなチェックする処理を Airflow では,つぎのように記述できます.ここでは簡素化するために, import 文や default values などの記述は割愛しています.

dag = DAG(

'bq_event_log_checker',
default_args=default_args,
schedule_interval='@daily')

# event_id が NULL になっているレコード数をカウント
# 結果は 0 であることを期待する
expected = 0
sql = """
SELECT COUNT(*) AS event_id_null_count
FROM event_log.event_log_{{ yesterday_ds_nodash }}
WHERE JSON_EXTRACT(event_id) IS NULL
"""

checker = BigQueryValueCheckOperator(
dag=dag,
task_id='bq_checker',
bigquery_conn_id='bq_connection_id',
sql=sql,
pass_value=expected,
)

# BigQuery の結果が期待した値ではなかったとき Slack にメッセージを送る
slack = SlackAPIPostOperator(
dag=dag,
task_id='post_error_message_to_slack',
token=YOUR_SLACK_TOKEN,
channel='#data-quality',
username='airflow',
text='event_log on {{ yesterday_ds_nodash }} has record(s) whose event_id is null.',
trigger_rule=TriggerRule.ALL_FAILED
)

# タスクの依存関係を設定
checker.set_downstream(slack)