2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

こちらの記事で気づきました。

DatabricksでPySparkデータフレームの品質を検証するために設計された新たなPythonベースのデータ品質フレームワークとのことです。

まずは、DQXの動機づけなどを翻訳します。

モチベーション

現在のデータ品質フレームワークは、多くの場合特定の行や列のデータに関する詳細な説明の提供には欠けており、完全なデータセット向けに主に設計されており、ストリーミングワークロードへのインテグレーションを困難なものにしています。

このプロジェクトでは、PySparkデータフレームのデータ品質を評価するためのシンプルなPython検証フレームワークを導入します。事後のモニタリングのみに依存するのではなく、データ処理の過程でのリアルタイムの品質検証を可能にします。検証のアウトプットには、なぜ特定の行や列に問題があるのかに関する詳細な情報が含まれており、より迅速なデータ品質問題の特定や解決を可能にします。

出力に不正なデータが決して書き込まれないようにするために、不正なデータは検疫することができます。

レイクハウスアーキテクチャにおいては、不正なデータが後続のレイヤーに伝播しないようにするために、新規データの検証はキュレーテッドレイヤーにデータがエントリーされる際に行われるべきです。DQXを用いることで、データ品質制約に適合するように、容易に不正なデータを検疫し、キュレーションの後に再取り込みすることができます。

Deltaテーブルにすでに永続化されたデータ品質のモニタリングに関しては、Databricksのレイクハウスモニタリングを使うことをお勧めします。

キーとなる機能

  • なぜチェックが失敗したのかに関する情報。
  • データフォーマット非依存(Sparkデータフレームに対して動作)。
  • DLT(Delta Live Tables)を含むSparkのバッチ処理とストリーミング処理をサポート。
  • 不正データの削除、マーク、検疫などチェックに失敗した際に様々なリアクションが可能。
  • チェックレベルのサポート: 警告(マーク)やエラー(マークを行い、行を伝播させない)。
  • 行や列レベルでの品質ルールのサポート。
  • データ品質ルール候補のプロファイリングと生成。
  • コードや設定としての定義のチェック。
  • データ品質問題を特定、追跡するための検証結果のサマリーとデータ品質ダッシュボード。

要件

それでは、こちらのGithubのサンプルを動かしてみます。

ライブラリとしてDQXを使用

クラスターへのDQXのインストール

%pip install databricks-labs-dqx
dbutils.library.restartPython()

プロファイラーを使用して品質ルール候補を生成する

from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.sdk import WorkspaceClient
import yaml

schema = "col1: int, col2: int, col3: int, col4 int"
input_df = spark.createDataFrame([[1, 3, 3, 1], [2, None, 4, 1]], schema)

ws = WorkspaceClient()
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(input_df)
display(summary_stats)
display(profiles)

# DQX品質ルール/チェックを生成
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles)  # デフォルトレベルは "error"
print(yaml.safe_dump(checks))

# DLTエクスペクテーションを生成
dlt_generator = DQDltGenerator(ws)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles)
print(dlt_expectations)

display(summary_stats)の出力

{'col1': {'count': 2,
  'mean': 1.5,
  'stddev': 0.7071067811865476,
  'min': 1,
  '25%': 1,
  '50%': 1,
  '75%': 2,
  'max': 2,
  'count_non_null': 2,
  'count_null': 0},
 'col2': {'count': 2,
  'mean': 3.0,
  'stddev': None,
  'min': 3,
  '25%': 3,
  '50%': 3,
  '75%': 3,
  'max': 3,
  'count_non_null': 1,
  'count_null': 1},
 'col3': {'count': 2,
  'mean': 3.5,
  'stddev': 0.7071067811865476,
  'min': 3,
  '25%': 3,
  '50%': 3,
  '75%': 4,
  'max': 4,
  'count_non_null': 2,
  'count_null': 0},
 'col4': {'count': 2,
  'mean': 1.0,
  'stddev': 0.0,
  'min': 1,
  '25%': 1,
  '50%': 1,
  '75%': 1,
  'max': 1,
  'count_non_null': 2,
  'count_null': 0}}

display(profiles)の出力

Screenshot 2025-01-16 at 9.37.48.png

print(yaml.safe_dump(checks))の出力

- check:
    arguments:
      col_name: col1
    function: is_not_null
  criticality: error
  name: col1_is_null
- check:
    arguments:
      col_name: col1
      max_limit: 2
      min_limit: 1
    function: is_in_range
  criticality: error
  name: col1_isnt_in_range
- check:
    arguments:
      col_name: col3
    function: is_not_null
  criticality: error
  name: col3_is_null
- check:
    arguments:
      col_name: col3
      max_limit: 4
      min_limit: 3
    function: is_in_range
  criticality: error
  name: col3_isnt_in_range
- check:
    arguments:
      col_name: col4
    function: is_not_null
  criticality: error
  name: col4_is_null

print(dlt_expectations)の出力

['CONSTRAINT col1_is_not_null EXPECT (col1 is not null)', 'CONSTRAINT col1_min_max EXPECT (col1 >= 1 and col1 <= 2)', 'CONSTRAINT col3_is_not_null EXPECT (col3 is not null)', 'CONSTRAINT col3_min_max EXPECT (col3 >= 3 and col3 <= 4)', 'CONSTRAINT col4_is_not_null EXPECT (col4 is not null)']

品質チェックの検証

import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

checks = yaml.safe_load("""
- criticality: "invalid_criticality"
  check:
    function: "is_not_null"
    arguments:
      col_names:
        - "col1"
        - "col2"
""")

dq_engine = DQEngine(WorkspaceClient())
status = dq_engine.validate_checks(checks)
print(status.has_errors)
print(status.errors)
True
["Invalid value for 'criticality' field: {'criticality': 'invalid_criticality', 'check': {'function': 'is_not_null', 'arguments': {'col_names': ['col1', 'col2']}}}"]

yamlライクの辞書を使用して品質ルールを適用する

import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

checks = yaml.safe_load("""
- criticality: "error"
  check:
    function: "is_not_null"
    arguments:
      col_names:
        - "col1"
        - "col2"

- criticality: "error"
  check:
    function: "is_not_null_and_not_empty"
    arguments:
      col_name: "col3"

- criticality: "warn"
  check:
    function: "value_is_in_list"
    arguments:
      col_name: "col4"
      allowed:
        - 1
        - 2
""")

schema = "col1: int, col2: int, col3: int, col4 int"
input_df = spark.createDataFrame([[1, 3, 3, 1], [2, None, 4, 1]], schema)

# オプション1: データフレームに品質ルールを適用し、有効データと無効データ(隔離データ)を提供、チェックは自動的に検証されます
#valid_df, quarantined_df = apply_checks_by_metadata_and_split(input_df, checks)

# オプション2: データフレームに品質ルールを適用し、問題を追加の列(`_warning` と `_error`)として報告、チェックは自動的に検証されます
dq_engine = DQEngine(WorkspaceClient())
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
display(valid_and_quarantined_df)

Screenshot 2025-01-16 at 9.41.17.png

DQXクラスを使用して品質ルールを適用する

from databricks.labs.dqx.col_functions import is_not_null, is_not_null_and_not_empty, value_is_in_list
from databricks.labs.dqx.engine import DQEngine, DQRule, DQRuleColSet
from databricks.sdk import WorkspaceClient

checks = DQRuleColSet( # 複数の列に対してルールを定義
            columns=["col1", "col2"],
            criticality="error",
            check_func=is_not_null).get_rules() + [
         DQRule( # 単一の列に対してルールを定義
            name='col3_is_null_or_empty',
            criticality='error',
            check=is_not_null_and_not_empty('col3')),
         DQRule( # 名前は提供されない場合自動生成
            criticality='warn',
            check=value_is_in_list('col4', ['1', '2']))
        ]

schema = "col1: int, col2: int, col3: int, col4 int"
input_df = spark.createDataFrame([[1, 3, 3, 1], [2, None, 4, 1]], schema)

# オプション1: データフレームに品質ルールを適用し、有効データと無効データ(隔離されたデータ)を提供
#valid_df, quarantined_df = apply_checks_and_split(input_df, checks)

# オプション2: データフレームに品質ルールを適用し、問題を追加の列(`_warning` と `_error`)として報告
dq_engine = DQEngine(WorkspaceClient())
valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks)

display(valid_and_quarantined_df)

Screenshot 2025-01-16 at 9.41.45.png

メダリオンアーキテクチャでチェックを適用する

# ブロンズレイヤーを準備
bronze_path = "/tmp/dqx_demo/bronze"
df = spark.read.format("delta").load("/databricks-datasets/delta-sharing/samples/nyctaxi_2019")
df.write.format("delta").mode("overwrite").save(bronze_path)
# データ品質チェックを定義
import yaml

checks = yaml.safe_load("""
- check:
    function: "is_not_null"
    arguments:
      col_names:
        - "vendor_id"
        - "pickup_datetime"
        - "dropoff_datetime"
        - "passenger_count"
  criticality: "error"

- check:
    function: "is_not_null_and_not_empty"
    arguments:
      col_name: "vendor_id"
      trim_strings: true
  name: "vendor_id_is_null_or_empty"
  criticality: "error"

- check:
    function: "not_in_future"
    arguments:
      col_name: "pickup_datetime"
  name: "pickup_datetime_not_in_future"
  criticality: "warn"

- check:
    function: "not_in_future"
    arguments:
      col_name: "dropoff_datetime"
  name: "dropoff_datetime_not_in_future"
  criticality: "warn"

- check:
    function: "is_in_range"
    arguments:
      col_name: "passenger_count"
      min_limit: 0
      max_limit: 6
  name: "passenger_incorrect_count"
  criticality: "warn"

- check:
    function: "is_not_null"
    arguments:
      col_name: "trip_distance"
  name: "trip_distance_is_null"
  criticality: "error"
""")
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

dq_engine = DQEngine(WorkspaceClient())

# シルバーレイヤーに処理するときにチェックを適用
bronze = spark.read.format("delta").load(bronze_path)
silver, quarantine = dq_engine.apply_checks_by_metadata_and_split(bronze, checks)

チェックが適用された後のシルバーレイヤーのデータを表示します。チェックに通過したレコードのみが含まれています。

display(silver)

Screenshot 2025-01-16 at 9.43.40.png

検疫されたデータを表示します。エラーや警告のあったレコードが理由とともに格納されています。

display(quarantine)

Screenshot 2025-01-16 at 9.44.03.png

独自のカスタムチェックを作成する

カスタムチェック関数を作成する

import pyspark.sql.functions as F
from pyspark.sql import Column
from databricks.labs.dqx.col_functions import make_condition

def ends_with_foo(col_name: str) -> Column:
    column = F.col(col_name)
    return make_condition(column.endswith("foo"), f"Column {col_name} ends with foo", f"{col_name}_ends_with_foo")

カスタムチェック関数を適用する

import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.col_functions import *

# 使用組み込み、カスタム、およびSQL式チェック
checks = yaml.safe_load(
"""
- criticality: "error"
  check:
    function: "is_not_null_and_not_empty"
    arguments:
      col_name: "col1"
- criticality: "error"
  check:
    function: "ends_with_foo"
    arguments:
      col_name: "col1"
- criticality: "error"
  check:
    function: "sql_expression"
    arguments:
      expression: "col1 LIKE 'str%'"
      msg: "col1 starts with 'str'"
"""
)

schema = "col1: string"
input_df = spark.createDataFrame([["str1"], ["foo"], ["str3"]], schema)

dq_engine = DQEngine(WorkspaceClient())

valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals())
display(valid_and_quarantined_df)

Screenshot 2025-01-16 at 9.47.02.png

データフレーム全体ではなく、行や列単位での品質チェックができるというのは、確かに(固定されたサイズのデータフレームではない)ストリーミングワークロードでは特に役立ちそうです。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?