2
2

PySparkデータフレームの等価性テスト関数のご紹介

Last updated at Posted at 2024-04-29

Simplify PySpark testing with DataFrame equality functions | Databricks Blogの記事の翻訳、かつコードをウォークスルーします。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

DataFrame equality test functionsはPySparkのユニットテストをシンプルにするために、Apache Spark™ 3.5とDatabricks Runtime 14.2で導入されました。ここで説明する機能のフルセットは、間も無く提供されるApache Spark 4.0とDatabricks Runtime 14.3で利用できるようになります。

注意
ここではDatabricks Runtime 14.3を使用します。

すでに他の方も試されてました。

データフレーム等価性テスト関数を用いて、より自信をもってデータフレーム変換処理を記述できるように

PySparkにおけるデータの取り扱いには、データフレームの変換処理、集計処理、操作の適用が含まれます。変換処理が多くなっていった際に、あなたのコードが期待した通りに動作することに対してどのように自信を持つことができるのでしょうか?PySparkの等価性テストユーティリティ関数は、データが期待した結果になっていることをチェックするための効率的で効果的な手段を提供し、分析の初期段階で期待しない差異やエラーを特定する助けとなります。さらに、差異を正確にピンポイントできる直感的な情報を返却するので、デバッグに多大な時間を費やすることなしに、即座にアクションを取ることができます。

データフレーム等価性テスト関数の活用

Apache Spark 3.5で、PySparkデータフレームに対する2つの等価性テスト関数が導入されました: assertDataFrameEqualassertSchemaEqualです。これらをどのように使うのかを見ていきましょう。

assertDataFrameEqual: この関数によって、1行のコードで2つのPySparkデータフレームの等価性を比較することができ、データとスキーマがマッチしているかどうかをチェックします。違いがある際には説明的な情報を返却します。

サンプルをウォークスルーしましょう。最初に、意図的に最初の行に違いのある2つのデータフレームを作成します。

df_expected = spark.createDataFrame(data=[("Alfred", 1500), ("Alfred", 2500), ("Anna", 500), ("Anna", 3000)], schema=["name", "amount"])

df_actual = spark.createDataFrame(data=[("Alfred", 1200), ("Alfred", 2500), ("Anna", 500), ("Anna", 3000)], schema=["name", "amount"])

そして、2つのデータフレームを用いてassertDataFrameEqualを呼び出します:

from pyspark.testing import assertDataFrameEqual

assertDataFrameEqual(df_actual, df_expected)

この関数は、2つのデータフレームの最初の行が異なっていることを示す説明的なメッセージを返却します。この例では、最初のAlfredの数値が異なっています(expected: 1500, actual: 1200)。

PySparkAssertionError: [DIFFERENT_ROWS] Results do not match: ( 25.00000 % )
*** actual ***
[31m! Row(name='Alfred', amount=1200)[0m
  Row(name='Alfred', amount=2500)
  Row(name='Anna', amount=3000)
  Row(name='Anna', amount=500)


*** expected ***
[31m! Row(name='Alfred', amount=1500)[0m
  Row(name='Alfred', amount=2500)
  Row(name='Anna', amount=3000)
  Row(name='Anna', amount=500)

この情報によって、お使いのコードが生成したデータフレームに問題があることをすぐに知ることができ、これに基づいてデバッグの対象を特定することが出来ます。

また、この関数には、特定のユースケースに合わせて調整できるように、データフレーム比較の厳密性を制御するためのいくつかのオプションがあります。

assertSchemaEqual: この関数は2つのデータフレームのスキーマのみを比較します。行のデータは比較しません。2つの異なるデータフレームにおいて、カラム名、データ型、nullableプロパティが同じであるかどうかを検証することができます。

例を見てみましょう。最初にスキーマの異なる2つのデータフレームを作成します:

schema_actual = "name STRING, amount DOUBLE"

data_expected = [["Alfred", 1500], ["Alfred", 2500], ["Anna", 500], ["Anna", 3000]]
data_actual = [["Alfred", 1500.0], ["Alfred", 2500.0], ["Anna", 500.0], ["Anna", 3000.0]]

df_expected = spark.createDataFrame(data = data_expected)
df_actual = spark.createDataFrame(data = data_actual, schema = schema_actual)

次に、これらの2つのデータフレームのスキーマを用いてassertSchemaEqualを呼び出します:

from pyspark.testing import assertSchemaEqual

assertSchemaEqual(df_actual.schema, df_expected.schema)

この関数は、2つのデータフレームのスキーマが違っていることを特定し、これらが異なっていることを示す情報を出力します:

PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
--- actual
+++ expected
- StructType([StructField('name', StringType(), True), StructField('amount', DoubleType(), True)])
?                          ^^^^                                     ^^^^^^   ^ ^^^^

+ StructType([StructField('_1', StringType(), True), StructField('_2', LongType(), True)])
?                          ^^                                     ^^   ^ ^^

この例では、2つの違いがあります: actualデータフレームのamountカラムはLONGですが、expectedではDOUBLEであり、expectedデータフレームではスキーマを指定せずに作成しているので、カラム名も異なっています。

これらの違いの両方は、上の通り関数の出力でハイライトされています。

assertPandasOnSparkEqualはApache Spark 3.5.1で非推奨となり、今後提供されるApache Spark 4.0.0では削除予定であるため、ここではカバーしません。Pandas API on Sparkでのテストに関しては、Pandas API on Sparkの等価性テスト関数をご覧ください。

PySparkデータフレームの違いをデバッグするための構造化されたアウトプット

assertDataFrameEqualassertSchemaEqualは主に、作成したPySpark関数をテストするために小規模なデータセットを用いることの多いユニットテストに主眼を置いていますが、数行や数列のデータ以上のデータフレームを用いたいと考えるかもしれません。このようなシナリオでは、さらにデバッグを容易にするために違っている行の行データを容易に取得することができます。

どのように行うのかをみていきましょう。2つのデータフレームを作成するために、上で使用したものと同じデータを使います:

df_expected = spark.createDataFrame(data=[("Alfred", 1500), ("Alfred", 2500), ("Anna", 500), ("Anna", 3000)], schema=["name", "amount"])
df_actual = spark.createDataFrame(data=[("Alfred", 1200), ("Alfred", 2500), ("Anna", 500), ("Anna", 3000)], schema=["name", "amount"])

assertDataFrameEqualを呼び出した後に、アサーションエラーオブジェクトから2つのデータフレームで異なっているデータを取り出します:

from pyspark.testing import assertDataFrameEqual
from pyspark.errors import PySparkAssertionError

try:
    assertDataFrameEqual(df_actual, df_expected, includeDiffRows=True)
except PySparkAssertionError as e:
    # `e.data`は以下のようになっています:
    # [(Row(name='Alfred', amount=1200), Row(name='Alfred', amount=1500))]
    spark.createDataFrame(e.data, schema=["Actual", "Expected"]).show() 
+--------------+--------------+
|        Actual|      Expected|
+--------------+--------------+
|{Alfred, 1200}|{Alfred, 1500}|
+--------------+--------------+

これからわかるように、更なる分析のために異なっている行に関する情報を即座に活用することができます。デバッグのために、actualとexpectedのデータフレームからこの情報を抽出するためのコードを記述する必要はありません。

この機能は間も無く提供されるApache Spark 4.0とDBR 14.3で利用できるようになります。

Pandas API on Sparkの等価性テスト関数

PySparkデータフレームの等価性のテストのための関数に加えて、Pandas API on Sparkのユーザーは、以下のデータフレーム等価性テストの関数にアクセスできるようになります:

  • assert_frame_equal
  • assert_series_equal
  • assert_index_equal

これらの関数は、Pandas API on Sparkデータフレームに対する比較の厳密性を制御し、ユニットテストの助けとなる選択肢を提供します。これらは、pandas test utility functionsと全く同じAPIを提供するので、Pandas API on Sparkを用いてテストを行う際に、既存のpandasのテストコードを変更する必要がありません。

いくつかの異なるパラメータを用いてPandas API on Sparkデータフレームを比較するためのassert_frame_equalの活用方法の例を以下に示します:

from pyspark.pandas.testing import assert_frame_equal
import pyspark.pandas as ps

# 微妙に異なる2つのPandas API on Sparkデータフレームを作成
df1 = ps.DataFrame({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]})
df2 = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})  # 'b' column as integers

# 厳密なデータ型チェックによるデータフレーム等価性の検証
assert_frame_equal(df1, df2, check_dtype=True)

この例では、2つのデータフレームのスキーマが異なっています。この関数は違いを以下のように列挙します:

AssertionError: Attributes of DataFrame.iloc[:, 1] (column name="b") are different

Attribute "dtype" are different
[left]:  float64
[right]: int64
File <command-3482761931373807>, line 9
      6 df2 = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})  # 'b' column as integers
      8 # Validate DataFrame equality with strict data type checking
----> 9 assert_frame_equal(df1, df2, check_dtype=True)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/pandas/_testing/asserters.py:614, in raise_assert_detail(obj, message, left, right, diff, first_diff, index_values)
    611 if first_diff is not None:
    612     msg += f"\n{first_diff}"
--> 614 raise AssertionError(msg)

この例のように、check_dtype引数を用いることで、カラムのデータ型が異なっていたとしても、カラムのデータを比較することを支持することができます:

# check_dtype=Falseを指定してデータフレームの等価性をチェック
assert_frame_equal(df1, df2, check_dtype=False)

こちらはエラーになりません。assert_frame_equalでカラムのデータ型を無視するように指定したので、2つのデータフレームは等価であるとみなされています。

また、これらの関数によって、Pandas API on Sparkオブジェクトとpandasオブジェクトを比較することができ、この例で示すようにさまざまなデータフレームライブラリ間での互換性を促進することができます。

import pandas as pd
from pyspark.pandas.testing import assert_series_equal
from pyspark.pandas.testing import assert_index_equal

# Pandasデータフレーム
df_pandas = pd.DataFrame({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]})

# PandasデータフレームとPandas API on Sparkデータフレームの比較
assert_frame_equal(df1, df_pandas)

# PandasシリーズとPandas API on Sparkシリーズの比較
assert_series_equal(df1.a, df_pandas.a)

# PandasインデックスとPandas API on Sparkインデックスの比較
assert_index_equal(df1.index, df_pandas.index)

こちらもエラーにはなりません。

新たなPySpark DataFrameとPandas API on Sparkの等価性テスト関数の活用によって、あなたのPySparkコードが期待した通りに動作していることを確認するための優れた手段を提供します。これらの関数は、エラーの特定に役立つだけではなく、何がおかしいのかを正確に理解することで、問題がどこにあるのかをクイックかつ容易に特定できるようになります。詳細はTesting PySparkページをご覧ください。

これらの関数は、間も無く提供されるApache Spark 4.0で利用できるようになります。DBR 14.2ではすでにサポートされています。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2