6
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Great Expectations(GE) によるノートブック型環境(Databricks)でのデータ品質保証方法のまとめ

Last updated at Posted at 2022-08-15

概要

本記事にて、Great Expectations(GE) によるノートブック型環境(Databricks)でのデータ品質保証方法を共有する。GE がツールの概要を提示した上で、コードとその実行結果を提示する。

実行コードとその出力結果を Github に配置してあり、次のリンクで確認が可能。

Great Expectations とは

Great Expectations (GE) とは、データに対する検証、ドキュメント化、および、プロファイリングにより、データ品質の保証と改善を支援する OSS の Python ライブラリである。データに対する品質保証条件(GE では Expectations と呼ぶ)に基づき、データソースへの検証を行い、検証結果をドキュメント化することができる。

image.png
引用元:Welcome | Great Expectations

Expectation を、次のように定義する。下記図では、passenger_count というカラムにおける値の最小が1であり、値の最大が6であることを確認している。このような Expectation を複数定義して、データ品質検証を行う。

image.png
引用元:Welcome | Great Expectations

生成されるドキュメントは、下記の項目が表示される HTML ファイルであり、静的サイトとしてホストすることができる。検証するごとにその時点でのファイルが生成され、過去にさかのぼって確認することも可能。

image.png
引用元:Welcome | Great Expectations

GE を利用する基本的な手順は次のようになっている。ドキュメントの構成も、手順がベースとなっている。

  1. セットアップ
  2. データへの接続
  3. Expectations の作成
  4. データの検証

下記図が、ドキュメントの構成。

image.png
引用元:Welcome | Great Expectations

検証できるデータソースには次のものがある。本記事後述の手順では、Spark Dataframe、および、Pandas Dataframeを利用している。

  • データベース(SQLAlchemy 経由)
  • Pandas Dataframe
  • Spark Dataframe

SQLAlchemy 経由で接続できるデータベースには、次のものが提示されている。

データソースごとに利用できる Expection が異なり、次のリンクが参考になる。

GE を利用する方法として、CLIによる方法とノートブック型環境による方法がある。本記事後述の手順では後者の方法を実施しており、次のドキュメントが参考となる。

データ検証後に、ドキュメントを作成するだけでなく、次のような Action を設定可能。

GE に関する基本的な情報は、次のリンクに記載されている。

# リンク 概要
1 Welcome 概要
2 Getting started with Great Expectations チュートリアル
3 Glossary of Terms 用語集
4 Customize your deployment Great Expectations 利用するための考慮事項
5 Explore Expectations Expectation 一覧
6 Community Page • Great Expectations コミュニティ関連
7 Case studies from Great Expectations ケーススタディ

ノートブック型環境(Databricks)での Great Expectations の実行

1. Greate Exceptions を利用するための事前準備

Great Expectations のインストール

%pip install great_expectations -q

image.png

データ(データフレーム)の準備

schema = '''
`VendorID` INT,
`tpep_pickup_datetime` TIMESTAMP,
`tpep_dropoff_datetime` TIMESTAMP,
`passenger_count` INT,
`trip_distance` DOUBLE,
`RatecodeID` INT,
`store_and_fwd_flag` STRING,
`PULocationID` INT,
`DOLocationID` INT,
`payment_type` INT,
`fare_amount` DOUBLE,
`extra` DOUBLE,
`mta_tax` DOUBLE,
`tip_amount` DOUBLE,
`tolls_amount` DOUBLE,
`improvement_surcharge` DOUBLE,
`total_amount` DOUBLE,
`congestion_surcharge` DOUBLE
'''
 
src_files = [
    "/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-01.csv.gz",
#     "/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-02.csv.gz",
]
 
tgt_df = (
    spark
    .read
    .format("csv")
    .schema(schema)
    .option("header", "true")
    .option("inferSchema", "false")
    .load(src_files)
)

image.png

2. 基本的なデータ品質検証の実施

本章では、次の記事を参考にしている。

Great Expectations のセットアップ

import datetime
 
from ruamel import yaml
 
import great_expectations as ge
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
    DataContextConfig,
    FilesystemStoreBackendDefaults,
)
# root_directory の初期化
dbutils.fs.rm(root_directory_in_spark_api, True)
 
try:
    # ディレクトリを確認
    display(dbutils.fs.ls(root_directory_in_spark_api))
except:
    print('Directory is empty.')
# Great expectaions 利用時のエントリーポイントである Data Context を定義
# https://docs.greatexpectations.io/docs/terms/data_context/
 
# great_expectations.yml を参照せずに定義を実施
data_context_config = DataContextConfig(
    store_backend_defaults=FilesystemStoreBackendDefaults(
        root_directory=root_directory
    ),
)
context = BaseDataContext(project_config=data_context_config)
 
# 利用状況の情報共有を提供を停止
# https://docs.greatexpectations.io/docs/reference/anonymous_usage_statistics/
context.anonymous_usage_statistics.enabled = False
# ディレクトリを確認
display(dbutils.fs.ls(root_directory_in_spark_api))

image.png

データへの接続

datasource_name = "taxi_datasource"
dataconnector_name = "databricks_df"
data_asset_name = "nyctaxi_tripdata_yellow_yellow_tripdata"
tgt_deploy_env = "prod"

datasource_config = {
    # データソースを定義
    # https://docs.greatexpectations.io/docs/terms/datasource
    "name": datasource_name,
    "class_name": "Datasource",
 
    # execution_engine を定義
    # https://docs.greatexpectations.io/docs/terms/execution_engine/
    "execution_engine": {
        "module_name": "great_expectations.execution_engine",
        "class_name": "SparkDFExecutionEngine",
    },
 
    # データコネクターを定義
    # https://docs.greatexpectations.io/docs/terms/data_connector/
    "data_connectors": {
        dataconnector_name: {
            "module_name": "great_expectations.datasource.data_connector",
            "class_name": "RuntimeDataConnector",
            "batch_identifiers": [
                "some_key_maybe_pipeline_stage",
                "some_other_key_maybe_run_id",
            ],
        }
    },
}
context.add_datasource(**datasource_config)
batch_request = RuntimeBatchRequest(
    datasource_name=datasource_name,
    data_connector_name=dataconnector_name,
    data_asset_name = data_asset_name,
    batch_identifiers={
        "some_key_maybe_pipeline_stage": tgt_deploy_env,
        "some_other_key_maybe_run_id": f"my_run_name_{datetime.date.today().strftime('%Y%m%d')}",
    },
    runtime_parameters={"batch_data": tgt_df},
)

image.png

Expectations を作成

checkpoint_config_name = "nyctaxi_tripdata_yellow_yellow_tripdata__checkpoint"
# チェックポイントを定義
checkpoint_config = {
    "name":checkpoint_config_name,
    "config_version": 1,
    "class_name": "SimpleCheckpoint",
    "expectation_suite_name": expectation_suite_name,
    "run_name_template": "%Y%m%d-%H%M%S-yctaxi_tripdata_yellow_yellow_tripdata",
}
context.add_checkpoint(**checkpoint_config)

image.png

# checkpoints にファイルが作成されたことを確認
checkpoints_file_path = f'{root_directory_in_spark_api}/checkpoints/{checkpoint_config_name}.yml'
print(dbutils.fs.head(checkpoints_file_path))

checkpoint_result = context.run_checkpoint(
    checkpoint_name=checkpoint_config_name,
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": expectation_suite_name,
        }
    ],
)

image.png

検証結果を確認

# 品質チェック結果を表示
checkpoint_result["success"]
# 品質チェック結果の HTML ファイルを表示
first_validation_result_identifier = (
    checkpoint_result.list_validation_result_identifiers()[0]
)
first_run_result = checkpoint_result.run_results[first_validation_result_identifier]
 
docs_path = first_run_result['actions_results']['update_data_docs']['local_site']
 
html = dbutils.fs.head(docs_path,)
 
displayHTML(html)

image.png

3. 品質エラーがある場合の動作検証

# エラーとなる expectation を追加
validator.expect_column_values_to_not_be_null(
    column="congestion_surcharge",
)
 
validator.save_expectation_suite(discard_failed_expectations=False)
checkpoint_result = context.run_checkpoint(
    checkpoint_name=checkpoint_config_name,
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": expectation_suite_name,
        }
    ],
)
# 品質チェック結果のを表示
checkpoint_result["success"]
# 品質チェック結果の HTML ファイルを表示
first_validation_result_identifier = (
    checkpoint_result.list_validation_result_identifiers()[0]
)
first_run_result = checkpoint_result.run_results[first_validation_result_identifier]
 
docs_path = first_run_result['actions_results']['update_data_docs']['local_site']
 
html = dbutils.fs.head(docs_path,)
 
displayHTML(html)

image.png

4. データプロファイリング

本章では、次の記事を参考にしている。

from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler
from great_expectations.dataset.sparkdf_dataset import  SparkDFDataset
from great_expectations.render.renderer import *
from great_expectations.render.view import DefaultJinjaPageView
basic_dataset_profiler = BasicDatasetProfiler()
from great_expectations.dataset.pandas_dataset import PandasDataset
gdf = PandasDataset(
    tgt_df
    .limit(1000)
    .toPandas()
) 

下記のコードにより、spark データフレームでも実行できるか、パフォーマンスに課題あり

from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
gdf = SparkDFDataset(
    tgt_df
    .limit(1000)
) 

print(gdf.spark_df.count())
gdf.spark_df.display()

image.png

# データを確認
print(gdf.count())
gdf.head()

image.png

from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler
 
# データをプロファイリング
expectation_suite, validation_result = gdf.profile(BasicDatasetProfiler)
from great_expectations.render.renderer import (
    ProfilingResultsPageRenderer,
    ExpectationSuitePageRenderer,
)
from great_expectations.render.view import DefaultJinjaPageView
 
profiling_result_document_content = ProfilingResultsPageRenderer().render(validation_result)
expectation_based_on_profiling_document_content = ExpectationSuitePageRenderer().render(expectation_suite)

# HTML を生成
profiling_result_HTML = DefaultJinjaPageView().render(profiling_result_document_content) # type string or str
expectation_based_on_profiling_HTML = DefaultJinjaPageView().render(expectation_based_on_profiling_document_content)

displayHTML(profiling_result_HTML)

image.png

5. リソースのクリーンアップ

dbutils.fs.rm(root_directory_in_spark_api, True)

image.png

6
6
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?