はじめに
マーケティング等でデータ活用を行うとき、データの収集や分析だけではなくデータ品質の管理も重要です。活用当初の品質はよくても、
- 収集元のデータ形式が変更された
- 収集処理の障害により抜け漏れが発生した
等の理由で徐々にデータの品質が低下する可能性があります。そのため、定期的にデータのバリデーションを行い、品質を管理することが必要です。データ活用プラットフォームのプロダクトの中には品質管理機能を内蔵しているものもあります1が、当該機能がない場合は別途専用のツールを導入することになります。
Great Expectations は、そのような別途導入が必要なケースで利用可能なデータ品質管理ツールの 1 つです。本記事では、Great Expectations について特徴や簡単な利用方法を紹介します。
Great Expectations の特徴
おもな特徴は次になります。
-
Python のライブラリとして提供
-
OSS (https://github.com/great-expectations/great_expectations)
-
SQL データベース、ファイル形式のデータ (CSV、JSON 等) に対応
-
メールや Slack 通知、Opsgenie によるアラート機能を提供
-
バリデーション結果のドキュメント化が可能
そのため、「無料のデータ品質管理ツールを導入したい」や「ETL の最後にバリデーション処理を実装したい」といった要望にマッチするツールだと思います。
Great Expectations を使ってみよう
それでは、Great Expectations を使って SQL データベースのバリデーションを行ってみます。
1. インストール
Great Expectations は Python ライブラリとして提供されていますので、pip
コマンドからインストールできます。また、SQL データベースのバリデーションを行う場合、SQLAlchemy のインストールも必要になります。本記事では、それぞれバージョン great_expectations==0.17.19
と SQLAlchemy==2.0.19
を使用します。
pip install --user great_expectations==0.17.19 SQLAlchemy==2.0.19
2. バリデーション
バリデーションは、Great Expectations のライブラリを利用して次のように実装・実行できます。
import great_expectations as gx
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.checkpoint import SimpleCheckpoint
# バリデーションの設定・結果の保存先として context を作成します
context = gx.get_context()
# バリデーション対象のデータベースに接続します
# - name に任意の名前、connection_string にデータベースへの接続情報を渡します
datasource = context.sources.add_sqlite(
name="sqlite_foo",
connection_string="sqlite:///foo.db"
)
# バリデーション対象のデータ (テーブル) を指定します。
# - name に任意の名前、table_name にテーブル名を渡します
batch_request = datasource.add_table_asset(
name="variables",
table_name="variables"
).build_batch_request()
expectation_suite_name = "variables_expectation_suite"
# 空のバリデーション内容 (Expectation Suite) を作ります
# - expectation_suite_name に任意の名前を渡します
# - 以降の処理で空の Expectation Suite に確認項目を追加します
suite = context.add_or_update_expectation_suite(
expectation_suite_name=expectation_suite_name
)
# ExpectationConfiguration インスタンスのリストでバリデーション内容を定義します
# - ExpectationConfiguration については後述します
expectations = [
ExpectationConfiguration(
expectation_type="expect_table_columns_to_match_set",
kwargs={
"column_set": [
"id",
"key",
"value",
],
"exact_match": False,
},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_of_type",
kwargs={
"column": "id",
"type_": "TEXT",
},
),
]
# 空の Expectation Suite に expectations リストを追加します
suite.add_expectation_configurations(expectations)
# Expectation Suite の設定内容を保存します
context.save_expectation_suite(suite)
# 以上の処理で設定した context (設定保存先)、batch_request (テーブルの指定)、
# Expectation Suite (バリデーション内容) でバリデーションを行うように指定します
# - name に任意の名前、data_context に context、batch_request に batch_request、
# expectation_suite_name に Expectation Suite の名前を渡します
checkpoint = SimpleCheckpoint(
name="variables_checkpoint",
data_context=context,
batch_request=batch_request,
expectation_suite_name=expectation_suite_name
)
# バリデーションを実行します。
checkpoint.run()
バリデーション内容の定義に使用している ExpectationConfiguration
クラスは、確認の種類 (型チェックや最大値・最小値のチェック、非 NULL チェックなど) と確認の種類に応じた設定を指定できます。コンストラクタで、expectation_type
に確認の種類、kwargs
に確認の設定を渡します。例えば、次のようにインスタンス化します。
ExpectationConfiguration(
# テーブルに指定のカラムが存在することを確認
expectation_type="expect_table_columns_to_match_set",
# - column_set でカラム名を指定
# - exact_match = True (デフォルト) の場合は指定のカラムのみが存在していれば成功、
# False の場合、指定カラム以外のカラムが存在していても成功
kwargs={
"column_set": [
"id",
"key",
"value",
],
"exact_match": False,
},
)
expectation_type
には様々な確認項目を指定できます。各項目の詳細は公式ドキュメント (https://greatexpectations.io/expectations/) から確認できます。一部を以下に紹介します。
expectation_type | 確認項目 |
---|---|
expect_table_columns_to_match_set | テーブルに指定カラムが存在すること |
expect_column_values_to_be_of_type | 指定カラムが指定のデータ型であること |
expect_column_values_to_not_be_null | 指定カラムに NULL が挿入されていないこと |
expect_column_values_to_match_regex | 指定カラムの値が指定の正規表現にマッチすること |
3. バリデーション結果の確認
バリデーションの結果は JSON 形式および HTML ドキュメントで出力されます。
JSON 形式の結果は、Python スクリプトから直接確認できます。
-
json["success"]: boolean
から全体のバリデーション結果 (成功 or 失敗) が確認できます。 -
json["run_results"]["validation_result"]["results"]: list
から各確認項目の結果や詳細 (不正を検出したレコード数など) を確認できます。
# バリデーションを実行します。
result = checkpoint.run()
# 結果を表示します。
print(result)
{
"success": false,
"run_results": {
"validation_result": {
"success": false,
"results": [
{
"success": true,
"expectation_config": {
"expectation_type": "expect_table_columns_to_match_set",
"kwargs": {
"column_set": [
"id",
"key",
"value"
],
"exact_match": false,
"batch_id": "sqlite_foo-variables"
},
"meta": {}
},
"result": {
"observed_value": [
"created_at",
"id",
"key",
"owner",
"value"
],
"details": {
"mismatched": {
"unexpected": [
"created_at",
"owner"
]
}
}
},
"meta": {},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
}
},
{
"success": false,
"expectation_config": {
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"column": "id",
"type_": "TEXT",
"batch_id": "sqlite_foo-variables"
},
},
"result": {
"observed_value": "INTEGER"
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
}
}
],
"evaluation_parameters": {},
"statistics": {
"evaluated_expectations": 2,
"successful_expectations": 1,
"unsuccessful_expectations": 1,
"success_percent": 50.0
},
}
}
HTML ドキュメントでは、本記事の冒頭に載せたように JSON 形式と同じ内容を Web UI で確認できます。保存場所は次の例のように Python スクリプトから確認できます。例では、/var/tmp/tmpd45_9d50
ディレクトリに HTML ドキュメントが保存されています。
# バリデーション処理の冒頭で作成した context の get_docs_sites_urls を呼びます。
context.get_docs_sites_urls()
[{'site_name': 'local_site', 'site_url': 'file:///var/tmp/tmpd45_9d50/index.html'}]
4. アラート機能の利用
Great Expectations は、メールや Slack 通知、Opsgenie によるアラート機能を提供しています。例えば、Slack 通知をする場合は以下のスクリプトでバリデーションを実行します。
# slack_webhook で slack の Webhook URL、notify_on でいつ通知するかを指定します。
# - notify_on="all" の場合バリデーション実行後に必ず通知します。
# - notify_on="failure"、notify_on="success" の場合はそれぞれ失敗、成功したときに通知します。
checkpoint = SimpleCheckpoint(
name="variables_checkpoint_with_slack_notify",
data_context=context,
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
slack_webhook="https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
notify_on="all",
)
# バリデーションを実行します。
checkpoint.run()
バリデーション実行後、次のような通知が飛びます。
おわりに
本記事では、データ品質管理ツール Great Expectations を紹介しました。無料の Python ライブラリとして利用できるので、有料サービス以外でデータバリデーションを導入したい場合に採用候補になるのではと思います。
また、Great Expectations はバリデーションとドキュメント化以外にもプロファイリングという機能を備えています。プロファイリングはデータを走査して自動でバリデーション内容を作成・提案する機能で、少ない手間でバリデーション内容を作成することができます。こちらについては、また別の記事で紹介したいと思います。
参考
- Great Expectations 公式ドキュメント
-
例えば AI プラットフォームの Dataiku はデータ品質の管理機能を内臓しており、データの収集・分析・品質管理を一括で行えます。 ↩