こちらの記事で気づきました。
DatabricksでPySparkデータフレームの品質を検証するために設計された新たなPythonベースのデータ品質フレームワークとのことです。
まずは、DQXの動機づけなどを翻訳します。
モチベーション
現在のデータ品質フレームワークは、多くの場合特定の行や列のデータに関する詳細な説明の提供には欠けており、完全なデータセット向けに主に設計されており、ストリーミングワークロードへのインテグレーションを困難なものにしています。
このプロジェクトでは、PySparkデータフレームのデータ品質を評価するためのシンプルなPython検証フレームワークを導入します。事後のモニタリングのみに依存するのではなく、データ処理の過程でのリアルタイムの品質検証を可能にします。検証のアウトプットには、なぜ特定の行や列に問題があるのかに関する詳細な情報が含まれており、より迅速なデータ品質問題の特定や解決を可能にします。
出力に不正なデータが決して書き込まれないようにするために、不正なデータは検疫することができます。
レイクハウスアーキテクチャにおいては、不正なデータが後続のレイヤーに伝播しないようにするために、新規データの検証はキュレーテッドレイヤーにデータがエントリーされる際に行われるべきです。DQXを用いることで、データ品質制約に適合するように、容易に不正なデータを検疫し、キュレーションの後に再取り込みすることができます。
Deltaテーブルにすでに永続化されたデータ品質のモニタリングに関しては、Databricksのレイクハウスモニタリングを使うことをお勧めします。
キーとなる機能
- なぜチェックが失敗したのかに関する情報。
- データフォーマット非依存(Sparkデータフレームに対して動作)。
- DLT(Delta Live Tables)を含むSparkのバッチ処理とストリーミング処理をサポート。
- 不正データの削除、マーク、検疫などチェックに失敗した際に様々なリアクションが可能。
- チェックレベルのサポート: 警告(マーク)やエラー(マークを行い、行を伝播させない)。
- 行や列レベルでの品質ルールのサポート。
- データ品質ルール候補のプロファイリングと生成。
- コードや設定としての定義のチェック。
- データ品質問題を特定、追跡するための検証結果のサマリーとデータ品質ダッシュボード。
要件
- Python 3.10以降。手順をご覧ください。
- Unity Catalogが有効化されたDatabricksワークスペース
- インストールプロセスで使用されるDatabricksワークスペースへのネットワークアクセス。
- (オプション)Databricks CLI v0.213以降。手順をご覧ください。
- Spark 3.5.0以降のDatabricksランタイム。手順をご覧ください。
それでは、こちらのGithubのサンプルを動かしてみます。
ライブラリとしてDQXを使用
クラスターへのDQXのインストール
%pip install databricks-labs-dqx
dbutils.library.restartPython()
プロファイラーを使用して品質ルール候補を生成する
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.sdk import WorkspaceClient
import yaml
schema = "col1: int, col2: int, col3: int, col4 int"
input_df = spark.createDataFrame([[1, 3, 3, 1], [2, None, 4, 1]], schema)
ws = WorkspaceClient()
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(input_df)
display(summary_stats)
display(profiles)
# DQX品質ルール/チェックを生成
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles) # デフォルトレベルは "error"
print(yaml.safe_dump(checks))
# DLTエクスペクテーションを生成
dlt_generator = DQDltGenerator(ws)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles)
print(dlt_expectations)
display(summary_stats)の出力
{'col1': {'count': 2,
'mean': 1.5,
'stddev': 0.7071067811865476,
'min': 1,
'25%': 1,
'50%': 1,
'75%': 2,
'max': 2,
'count_non_null': 2,
'count_null': 0},
'col2': {'count': 2,
'mean': 3.0,
'stddev': None,
'min': 3,
'25%': 3,
'50%': 3,
'75%': 3,
'max': 3,
'count_non_null': 1,
'count_null': 1},
'col3': {'count': 2,
'mean': 3.5,
'stddev': 0.7071067811865476,
'min': 3,
'25%': 3,
'50%': 3,
'75%': 4,
'max': 4,
'count_non_null': 2,
'count_null': 0},
'col4': {'count': 2,
'mean': 1.0,
'stddev': 0.0,
'min': 1,
'25%': 1,
'50%': 1,
'75%': 1,
'max': 1,
'count_non_null': 2,
'count_null': 0}}
display(profiles)の出力
print(yaml.safe_dump(checks))の出力
- check:
arguments:
col_name: col1
function: is_not_null
criticality: error
name: col1_is_null
- check:
arguments:
col_name: col1
max_limit: 2
min_limit: 1
function: is_in_range
criticality: error
name: col1_isnt_in_range
- check:
arguments:
col_name: col3
function: is_not_null
criticality: error
name: col3_is_null
- check:
arguments:
col_name: col3
max_limit: 4
min_limit: 3
function: is_in_range
criticality: error
name: col3_isnt_in_range
- check:
arguments:
col_name: col4
function: is_not_null
criticality: error
name: col4_is_null
print(dlt_expectations)の出力
['CONSTRAINT col1_is_not_null EXPECT (col1 is not null)', 'CONSTRAINT col1_min_max EXPECT (col1 >= 1 and col1 <= 2)', 'CONSTRAINT col3_is_not_null EXPECT (col3 is not null)', 'CONSTRAINT col3_min_max EXPECT (col3 >= 3 and col3 <= 4)', 'CONSTRAINT col4_is_not_null EXPECT (col4 is not null)']
品質チェックの検証
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = yaml.safe_load("""
- criticality: "invalid_criticality"
check:
function: "is_not_null"
arguments:
col_names:
- "col1"
- "col2"
""")
dq_engine = DQEngine(WorkspaceClient())
status = dq_engine.validate_checks(checks)
print(status.has_errors)
print(status.errors)
True
["Invalid value for 'criticality' field: {'criticality': 'invalid_criticality', 'check': {'function': 'is_not_null', 'arguments': {'col_names': ['col1', 'col2']}}}"]
yamlライクの辞書を使用して品質ルールを適用する
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = yaml.safe_load("""
- criticality: "error"
check:
function: "is_not_null"
arguments:
col_names:
- "col1"
- "col2"
- criticality: "error"
check:
function: "is_not_null_and_not_empty"
arguments:
col_name: "col3"
- criticality: "warn"
check:
function: "value_is_in_list"
arguments:
col_name: "col4"
allowed:
- 1
- 2
""")
schema = "col1: int, col2: int, col3: int, col4 int"
input_df = spark.createDataFrame([[1, 3, 3, 1], [2, None, 4, 1]], schema)
# オプション1: データフレームに品質ルールを適用し、有効データと無効データ(隔離データ)を提供、チェックは自動的に検証されます
#valid_df, quarantined_df = apply_checks_by_metadata_and_split(input_df, checks)
# オプション2: データフレームに品質ルールを適用し、問題を追加の列(`_warning` と `_error`)として報告、チェックは自動的に検証されます
dq_engine = DQEngine(WorkspaceClient())
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
display(valid_and_quarantined_df)
DQXクラスを使用して品質ルールを適用する
from databricks.labs.dqx.col_functions import is_not_null, is_not_null_and_not_empty, value_is_in_list
from databricks.labs.dqx.engine import DQEngine, DQRule, DQRuleColSet
from databricks.sdk import WorkspaceClient
checks = DQRuleColSet( # 複数の列に対してルールを定義
columns=["col1", "col2"],
criticality="error",
check_func=is_not_null).get_rules() + [
DQRule( # 単一の列に対してルールを定義
name='col3_is_null_or_empty',
criticality='error',
check=is_not_null_and_not_empty('col3')),
DQRule( # 名前は提供されない場合自動生成
criticality='warn',
check=value_is_in_list('col4', ['1', '2']))
]
schema = "col1: int, col2: int, col3: int, col4 int"
input_df = spark.createDataFrame([[1, 3, 3, 1], [2, None, 4, 1]], schema)
# オプション1: データフレームに品質ルールを適用し、有効データと無効データ(隔離されたデータ)を提供
#valid_df, quarantined_df = apply_checks_and_split(input_df, checks)
# オプション2: データフレームに品質ルールを適用し、問題を追加の列(`_warning` と `_error`)として報告
dq_engine = DQEngine(WorkspaceClient())
valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks)
display(valid_and_quarantined_df)
メダリオンアーキテクチャでチェックを適用する
# ブロンズレイヤーを準備
bronze_path = "/tmp/dqx_demo/bronze"
df = spark.read.format("delta").load("/databricks-datasets/delta-sharing/samples/nyctaxi_2019")
df.write.format("delta").mode("overwrite").save(bronze_path)
# データ品質チェックを定義
import yaml
checks = yaml.safe_load("""
- check:
function: "is_not_null"
arguments:
col_names:
- "vendor_id"
- "pickup_datetime"
- "dropoff_datetime"
- "passenger_count"
criticality: "error"
- check:
function: "is_not_null_and_not_empty"
arguments:
col_name: "vendor_id"
trim_strings: true
name: "vendor_id_is_null_or_empty"
criticality: "error"
- check:
function: "not_in_future"
arguments:
col_name: "pickup_datetime"
name: "pickup_datetime_not_in_future"
criticality: "warn"
- check:
function: "not_in_future"
arguments:
col_name: "dropoff_datetime"
name: "dropoff_datetime_not_in_future"
criticality: "warn"
- check:
function: "is_in_range"
arguments:
col_name: "passenger_count"
min_limit: 0
max_limit: 6
name: "passenger_incorrect_count"
criticality: "warn"
- check:
function: "is_not_null"
arguments:
col_name: "trip_distance"
name: "trip_distance_is_null"
criticality: "error"
""")
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# シルバーレイヤーに処理するときにチェックを適用
bronze = spark.read.format("delta").load(bronze_path)
silver, quarantine = dq_engine.apply_checks_by_metadata_and_split(bronze, checks)
チェックが適用された後のシルバーレイヤーのデータを表示します。チェックに通過したレコードのみが含まれています。
display(silver)
検疫されたデータを表示します。エラーや警告のあったレコードが理由とともに格納されています。
display(quarantine)
独自のカスタムチェックを作成する
カスタムチェック関数を作成する
import pyspark.sql.functions as F
from pyspark.sql import Column
from databricks.labs.dqx.col_functions import make_condition
def ends_with_foo(col_name: str) -> Column:
column = F.col(col_name)
return make_condition(column.endswith("foo"), f"Column {col_name} ends with foo", f"{col_name}_ends_with_foo")
カスタムチェック関数を適用する
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.col_functions import *
# 使用組み込み、カスタム、およびSQL式チェック
checks = yaml.safe_load(
"""
- criticality: "error"
check:
function: "is_not_null_and_not_empty"
arguments:
col_name: "col1"
- criticality: "error"
check:
function: "ends_with_foo"
arguments:
col_name: "col1"
- criticality: "error"
check:
function: "sql_expression"
arguments:
expression: "col1 LIKE 'str%'"
msg: "col1 starts with 'str'"
"""
)
schema = "col1: string"
input_df = spark.createDataFrame([["str1"], ["foo"], ["str3"]], schema)
dq_engine = DQEngine(WorkspaceClient())
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals())
display(valid_and_quarantined_df)
データフレーム全体ではなく、行や列単位での品質チェックができるというのは、確かに(固定されたサイズのデータフレームではない)ストリーミングワークロードでは特に役立ちそうです。