2
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PySpark によるデータ品質に関する処理の開発実践

Last updated at Posted at 2022-10-05

概要

PySpark によるデータ品質に関する処理の開発実践について次の項目を説明する。

  1. データ品質概要
  2. PySpark(Python) によるデータ品質処理方法
  3. データ品質処理の実績

本記事のコードを含むノートブックを以下のリンクに保存してあり、Databricks 等の環境にインポートして実行が可能。

本記事の位置付け

次の開発ガイドシリーズにおけるデータ品質チェック分野の1記事であり、リンク先には記事にて記事の全体像を整理している。

GroupID 分野
T10 Spark概要
T20 データエンジニアリング
T30 データ品質チェック
T40 データサイエンス
T50 メタデータデプロイ
T60 テスト
T70 DevOps

1. データ品質概要

1-1. データ品質とは

データ品質とは、業務の現実を適切に反映されたデータを利用者のニーズを満たす程度。複数の定義があるが、データの利用時の要求を満たすことができるかが共通の観点のようである。

データ品質を、DMBOK を出版している DAMA では次のように定義されている。

data quality

The degree to which data is accurate, complete, timely, consistent with all requirements and business rules, and relevant for a given use.

引用元:The DAMA Dictionary of Data Management, 2nd Edition:

データ品質

データが正確で、完全で、タイムリーで、すべての要件とビジネスルールと一致しており、特定の用途に関連している度合い。

上記の翻訳

DMBOK 2nd では次のような記述がある。

データ品質の度合いはデータ利用者の期待と要求を満たす度合いである。

引用元:データマネジメント知識体系ガイド 第二版 第13章 データ品質 1.3 本質的な概念

SQuaRE(Systems and software Quality Requirements and Evaluation)では、次のように定義されている。

明示された条件下で使用するとき、明示されたニーズ及び暗黙のニーズをデータの特性が満足する度合い

引用元:JIS X 25012 ソフトウェア製品の品質要求及び評価(SQuaRE)-データ品質モデル

1-2. データ品質の方法論

データ品質の方法論を、チーム、ドメイン、グローバルなどの組織の規模に関わらず、次のように実施。

  • データ品質方針の策定を行い、優先順位が高いデータに対してデータ品質保証を、データ品質保証をプロジェクトとしてではなく、継続的な業務として実施する。
  • データ品質SLAやデータの認証を定めることで、組織内でのデータ品質に対するコンセンサスを得て、3M(ムリ・ムダ・ムラ)を取り除きながら、データオーナーと共にデータ品質保証を依頼する。
  • 組織のあるべき姿を成熟度モデル評価軸に落とし込み、その成熟度モデルに照らし合わせながら組織の改革改善に取り組む。

データ品質問題を発生させる原因は次のようなものがあり、データ品質ソリューションのITシステム導入のみでの対応できないことに注意が必要。

  • リーダーシップの欠如
  • システム設計
  • データ入力プロセス
  • データ処理機能
  • 問題の修復

データ品質に関する参考となるリンク集であるが、データ品質に関わる際には、概論に関する記事を読んだうえでデータマネジメント協会日本支部の資料をベースに実践していくことが推奨。

1-3. データ品質のメトリック

データ品質のメトリックを整理する際には、SQuaRE が参考になる。下記表では、JIS X 25012 ソフトウェア製品の品質要求及び評価(SQuaRE)-データ品質モデル に記載されている データ品質特性、および、定義を引用している。

番号 データ品質特性 データ品質特性(英語) 定義
1 正確性 Accuracy 特定の利用状況において,意図した概念又は事象の属性の真の値を正しく表現する属性をデータがもつ度合い。
2 完全性 Completeness 実体に関連する対象データが,特定の利用状況において,全ての期待された属性及び関係する実体インスタンスに対する値をもつ度合い。
3 一貫性 Consistency 特定の利用状況において,矛盾がないという属性及び他のデータと首尾一貫しているという属性をデータがもつ度合い。
4 信ぴょう(憑)性 Credibility 特定の利用状況において,利用者によって真(実)で信頼できるとみなされる属性をデータがもつ度合い。
5 最新性 Currentness 特定の利用状況において,データが最新の値である属性をもつ度合い。
6 アクセシビリティ Accessibility 特に,幾つかの障害が原因で,支援技術又は特別の機器構成を必要とする人々が,特定の利用状況において,データにアクセスできる度合い。
7 標準適合性 Compliance 特定の利用状況において,データ品質に関係する,規格,協定又は規範,及び類似の規則を遵守する属性をデータがもつ度合い。
8 機密性 Confidentiality 特定の利用状況において,承認された利用者によってだけ利用でき,解釈できることを保証する属性をデータがもつ度合い。
9 効率性 Efficiency 特定の利用状況において,適切な量及び種類の資源を使用することによって処理することができ,期待された水準の性能を提供できる属性をデータがもつ度合い。
10 精度 Precision 正確な属性,又は特定の利用状況において弁別を提供する属性をデータがもつ度合い。
11 追跡可能性 Traceability 特定の利用状況において,データへのアクセス及びデータに実施された変更の監査証跡を提供する属性をデータがもつ度合い。
12 理解性 Understandability 利用者がデータを読み,説明することができる属性で,特定の利用状況において,適切な言語,シンボル及び単位で表現された属性をデータがもつ度合い。
13 可用性 Availability 特定の利用状況において,承認された利用者及び/又はアプリケーションがデータを検索できる属性をデータがもつ度合い。
14 移植性 Portability 特定の利用状況において,既存の品質を維持しながら,データを一つのシステムから他のシステムに実装したり,置き換えたり,移動したりできる属性をデータがもつ度合い。
15 回復性 Recoverability 特定の利用状況において,故障発生の場合でさえ,明示された水準の操作及び品質を継続し,維持することを可能にする属性をデータがもつ度合い。

データ品質特性にデータ品質評価方法(データ品質測定量の名称)を対応させることで、ソフトウェアでの実装につながる。データ品質評価方法に対して、ソフトウェアでの実装方法(例えば、Great Expectations を用いる場合には、利用する Expectations)を合わせて検討。

データ品質特性 データ品質評価方法(データ品質測定量の名称)
正確性 業務モデルとデータ仕様書に基づいたカラムの値の正確性の確認
正確性 データ仕様書と実際のデータに基づいたカラムの値の正確性の確認
正確性 ソースシステムの承認
正確性 区切りテキストファイルからデータ読み込み可否の確認
正確性 JSONファイルからデータ読み込み可否の確認
正確性 XMLファイルからデータ読み込み可否の確認
正確性 NULL値の割合の確認
正確性 他データセットとの結合可否データ割合の確認
正確性 データの妥当性の確認
完全性 カラムの値の完全性の確認
完全性 データの網羅性の確認
一貫性 一意性制約(大文字と小文字の区別なし)の検証
一貫性 一意性制約(大文字と小文字の区別あり)の検証
一貫性 参照整合性(外部キー制約)の検証
一貫性 ソースシステムとのデータの一致の確認
一貫性 集計値と想定値の比較による確認
一貫性 母集団の比較による確認
一貫性 時系列での比較による確認
一貫性 データプロファイリングに基づいた一貫性の確認
信ぴょう(憑)性 データオーナーの有無の確認
信ぴょう(憑)性 データのSLAの確認
信ぴょう(憑)性 データのトレーサビリティの確認
信ぴょう(憑)性 データリネージによる確認
信ぴょう(憑)性 データのライフサイクルの確認
最新性 最新性の確認
最新性 データ連携処理情報の可視化
アクセシビリティ アクセシビリティの確認
アクセシビリティ データ取得の容易性の確認
アクセシビリティ データ利用の即時性の確認
アクセシビリティ データへのアクセス認証の可否の確認
アクセシビリティ データへのアクセス権限付与の可否の確認
アクセシビリティ データ蓄積期間の確認
標準適合性 標準適合性の確認
標準適合性 ドメイン制約(チェック制約)の検証
標準適合性 非NULL制約の検証
標準適合性 スキーマの一致の確認
標準適合性 データの型と桁数を確認
標準適合性 データ型の変換可否の確認
機密性 機密性の確認
機密性 コンプライアンス遵守の確認
効率性 効率性の確認
精度 データの精度の妥当性確認
精度 データの桁数の確認
精度 データの粒度の確認
追跡可能性 追跡可能性の確認
理解性 理解性の確認
可用性 可用性の確認
移植性 移植性の確認
回復性 回復性の確認

規制によりデータ品質要件が求められる場合がある。製薬業界ではデータインテグリティ(Data integrity)の確保が求められており、データの品質要件としてALCOA+の原則(次表の項目)が定義されている。

用語 用語_ja-jp
Attributable 帰属性
Legible 判読性
Contemporaneous 同時性
Original 原本性
Accurate 正確性
Complete 完全性
Consistent 一貫性
Enduring 不朽性
Available 利用可能性

参考書籍、および、参考リンク

特定の協会や社団法人などの組織により公開されている情報が参考になることもある。IoT領域におけるセンシングデータのデータ品質の指標が、一般社団法人データ社会推進協議会により、次のような品質測定量がセンシングデータのデータ品質評価基準策定に向けた提案にて公開されている。

区分 番号 デバイス依存の品質測定量 説明
設計情報 デバイスの情報 デバイスに入力された物理量(光、音など)の計測原理、処理方式等の把握レベル
故障のしにくさ デバイスの稼働レベル
耐久性 寿命部品の低下レベル
セキュリティの対策 セキュリティ対策の実施レベル
通信の安定性 通信が途絶、遅延なく動作するレベル
設置・調整 設置方法の適切さ 条件にあった適切な設置の実施レベル
運用・保守 システムの安定稼働 安定稼働の計画レベル
システムの環境監視 設置状況の把握レベル
アップデートの適切さ 適切なソフトウェアバージョンの運用レベル

引用元:センシングデータのデータ品質評価基準策定に向けた提案 - 一般社団法人データ社会推進協議会

2. PySpark(Python) によるデータ品質処理方法

Spark(+Delta Lake)にてデータ品質保証を行うには、次のような方法がある。

  1. Delta Lake の機能を利用する方法
  2. PySpark によるデータ品質処理を実施する方法
  3. Python ライブラリを利用する方法
  4. Spark プロバイダーの機能を利用する方法

データエンジニアリング時にデータ品質要件を満たさないデータが含まれる場合には、次の対応方法がある。データエンジニアリング後のデータを利用するシステム(ダウンストリーム)へのデータ提供が実施されないような仕組みが必要。

  • 無効なレコードを保持
  • 無効なレコードを分割
  • エラー終了

Python ライブラリを用いる方法では、下記のライブラリがある。

  1. Great Expectations

その他Python ライブラリには次のものがあるが、2022年8月17日時点で、利用する上での懸念事項があり。

# ライブラリ 懸念事項
1 Deequ Scala でのみ動作すること
2 PyDeequ 開発のサイクルが Deequ より遅く、Spark の最新バージョンへ対応がされない可能性があること
3 pandera 開発のロードマップが不明確

データ品質を確認する際によく利用する下記の項目値を、Spark では、DESCRIBE TABLE - Spark 3.3.0 Documentation (apache.org) の実行結果から取得することができる。

  • テーブルの行
  • データ型
  • 平均サイズ
  • 最小値
  • 最大値
  • 個別の値の数
  • Null 値の数

Spark 以外のデータソースの統計情報を取得する方法が、Denodo 社のドキュメントが参考になる。

Spark プロバイダーの機能を利用する方法では、次の方法がある。

1. Databicks の Delta Live Table における expectations(データ品質チェック)機能

参考リンク

3. データ品質処理の実績

3-1. Delta Lake の機能を利用する方法

Delta Lake では、データ品質に関する次の機能が利用できる。

  • Delta Lake で利用可能な機能
  • Databricks でのみ利用可能な機能

参考リンク

3-2. PySpark によるデータ品質処理を実施する方法

PySpark によるデータ品質処理を行うために、次の手順を実。

  1. データ品質検証を実施
  2. エラーレコードのデータフレームを作成
  3. データ品質検証内容のカラムを追加

Databricks でのデータ品質処理の実施例が次のリンクにて紹介されている。

データ品質チェック方法と確認方法の例を下記表に示す。

データ品質チェック方法 確認方法
指定値による、データの網羅性の確認 期待値を保持したデータフレームとすべて結合できることを確認。
参照テーブルにによる、データの網羅性の確認 期待値を保持したデータフレームとすべて結合できることを確認。
集計値と想定値の比較によるデータの完全性の確認 集計条件の結果と期待値が一致することを確認。
一意性制約の検証 指定したカラムでレコード数が1レコードであることを確認。
指定値による、参照整合性(外部キー制約)の検証 期待値を保持したデータフレームと結合できないレコードを確認。
参照テーブルによる、参照整合性(外部キー制約)の検証 期待値を保持したデータフレームと結合できないレコードを確認。
NOT NULL制約の検証 指定したカラムに NULL がないことを確認。
ドメイン制約(チェック制約)の検証 指定値をすべて保持していることを確認。
スキーマの一致の確認 2つのデータフレームの pyspark.sql.DataFrame.schema のリターン値が一致することを確認。
データの型と桁数を確認 期待値のデータ型に応じた WHERE 句のクエリ結果を確認。
区切りテキストファイル(CSV、TSV)におけるデータ型の確認 pyspark.sql.DataFrameReader.csv における columnNameOfCorruptRecord の値が存在しないことを確認。
データ型の変換可否の確認 データ型変換前後の件数が一致することを確認。

1. 一意性制約の検証の実践

# id 列で一意性違反のデータフレームを作成
data = [
    {'id': 1, 'name': 'a', 'age': 10},
    {'id': 1, 'name': 'b', 'age': 20},
    {'id': 3, 'name': 'c', 'age': 30},
]

tgt_df = spark.createDataFrame(data)

tgt_df.display()

image.png

# 1. データ品質検証を実施
# 指定したカラムの値で集計したレコード数が1件を超えるカラム値のデータフレームを取得
from pyspark.sql.functions import count, col, lit

unique_cols = [
    'id',
    #     'name',
]

duplicated_col_values_df = (
    tgt_df.groupBy(unique_cols)
    .agg(count(lit(1)).alias("_duplicte_count"))
    .where(col("_duplicte_count") > 1)
    .drop('_duplicte_count')
)

# エラーレコードの存在チェックによりデータ品質検証結果を確認
print(duplicated_col_values_df.count() == 0)
duplicated_col_values_df.display()

image.png

# 2. エラーレコードのデータフレームを作成
# 重複しているレコードを保持したデータフレームを取得
dupliated_data_df = tgt_df.join(duplicated_col_values_df, unique_cols, 'inner')
dupliated_data_df.display()

image.png

# 3. データ品質検証内容のカラムを追加
# エラーレコードのデータフレームに、データ品質検証内容を保持した`_data_quality_check_conditions`列を追加
from pyspark.sql.functions import array, struct

values = [
    lit('check_unique_key_constraint').alias('dq_check_name'),
    lit(False).alias('is_succeeded'),
    array([lit(col) for col in unique_cols]).alias('dq_check_cols'),
]


dq_checked_df = dupliated_data_df.withColumn(
    '_data_quality_check_conditions',
    array(struct(values)),
)

dq_checked_df.display()

image.png

# 補足
# 2つ以上の検証結果を付与したい場合には、`array_union`を利用する
from pyspark.sql.functions import array_union

values = [
    lit('add_test').alias('dq_check_name'),
    lit(False).alias('is_succeeded'),
    array([lit(col) for col in unique_cols]).alias('dq_check_cols'),
]


tgt_df_columns = dq_checked_df.columns
if '_data_quality_check_conditions' in tgt_df_columns:
    dq_checked_df_2 = dq_checked_df.withColumn(
        '_data_quality_check_conditions',
        array_union(
            col('_data_quality_check_conditions'),
            array(struct(values)),
        ),
    )

dq_checked_df_2.display()

image.png

3-3. Python ライブラリを利用する方法

1. Great Expectations による方法

詳細を Great Expectations(GE) によるノートブック型環境(Databricks)でのデータ品質保証方法のまとめ - Qiita の記事にて記載。

image.png

2
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?