概要
この記事では、Spark 3.5 にて実装された PySpark の Testing 機能の1つである assertDataFrameEqual
についての検証結果を共有します。assertDataFrameEqualはPySparkのテストユーティリティの一部で、2つのデータフレームが等しいかどうかを検証するためのメソッドです。検証したケースのコードを記事内で後述しています。
検証では、正常系(データに差異がない場合)の動作確認やデータに差異がある場合の動作確認などのケースを実施しました。しかし、大量データでの検証ではタイムアウトの問題が発生し、テーブル間でのデータ差異の確認には利用できなさそうでした。ただ、まだリリースされて間もないため、今後改善される可能性があります。
次の記事を参考にしました。
- PySpak における Testing 機能の概要
- assertDataFrameEqual のドキュメント
事前準備
ライブラリのimport
import datetime
from pyspark.sql import Row
from decimal import Decimal
from pyspark.testing.utils import assertDataFrameEqual, assertSchemaEqual
メソッドの基本的な動作確認
正常系の動作確認
test_schema_01 = """
id int,
name string,
weight float,
birth_date date
"""
test_data_01 = [
Row(id=1, name="user_aaa", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
Row(id=2, name="user_bbb", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
Row(id=3, name="user_ccc", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
]
test_df_01 = spark.createDataFrame(test_data_01, test_schema_01)
test_df_01.display()
assertDataFrameEqual(
test_df_01,
test_df_01,
)
データに差異がある場合の動作確認
exp_schema_01 = """
id int,
name string,
weight float,
birth_date date
"""
exp_data_01 = [
Row(id=4, name="user_aaa", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
Row(id=2, name="user_bbb", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
Row(id=3, name="user_ccc", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
]
exp_df_01 = spark.createDataFrame(exp_data_01, exp_schema_01)
exp_df_01.display()
assertDataFrameEqual(
test_df_01,
exp_df_01,
)
次のようなエラーが発生しました。
assertDataFrameEqual(
test_df_01,
exp_df_01,
)
PySparkAssertionError: [DIFFERENT_ROWS] Results do not match: ( 100.00000 % )
*** actual ***
[31m- Row(id=1, name='user_aaa', weight=53.5, birth_date=datetime.date(2020, 1, 1))[0m
Row(id=2, name='user_bbb', weight=53.5, birth_date=datetime.date(2020, 1, 1))
Row(id=3, name='user_ccc', weight=53.5, birth_date=datetime.date(2020, 1, 1))
*** expected ***
Row(id=2, name='user_bbb', weight=53.5, birth_date=datetime.date(2020, 1, 1))
Row(id=3, name='user_ccc', weight=53.5, birth_date=datetime.date(2020, 1, 1))
[31m+ Row(id=4, name='user_aaa', weight=53.5, birth_date=datetime.date(2020, 1, 1))[0m
スキーマに差異がある場合の動作確認
exp_schema_02 = """
id int
"""
exp_data_02 = [
Row(id=1),
]
exp_df_02 = spark.createDataFrame(exp_data_02, exp_schema_02)
exp_df_02.display()
assertDataFrameEqual(
test_df_01,
exp_df_02,
)
次のようなエラーが発生しました。
assertDataFrameEqual(
PySpark の Testing 機能にはassertSchemaEqual
というメソッドもあり、同様のエラーが発生しました。
# assertSchemaEqual との比較
assertSchemaEqual(
test_df_01.schema,
exp_df_02.schema,
)
--- actual
+++ expected
- StructType([StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('weight', FloatType(), True), StructField('birth_date', DateType(), True)])
+ StructType([StructField('id', IntegerType(), True)])
エラーハンドリング
try/exceptブロックによりエラーを取得したい場合には、次のようなコードにより可能です。
from pyspark.testing.utils import PySparkAssertionError
try:
assertDataFrameEqual(test_df_01, exp_df_01)
except PySparkAssertionError:
print("エラーです")
データ型の対応可否確認
正常系
test_data_02 = [
Row(
string_column="AAA",
byte_column=1,
integer_column=1,
bigint_column=1,
float_column=12.300000190734863,
double_column=12.3,
numeric_column=Decimal("12"),
boolean_column=True,
date_column=datetime.date(2020, 1, 1),
timestamp_column=datetime.datetime(2021, 1, 1, 0, 0),
timestamp_ntz_column=datetime.datetime(2021, 1, 1, 0, 0),
binary_column=bytearray(b"A"),
struct_column=Row(struct_string_column="AAA", struct_int_column=1),
array_column=["AAA", "BBB", "CCC"],
map_column={"AAA": 1},
)
]
test_schema_02 = """
--文字型
string_column string,
--整数型
byte_column byte,
integer_column integer,
bigint_column bigint,
--浮動小数点型
float_column float,
double_column double,
numeric_column numeric,
--真偽型
boolean_column boolean,
--日付時刻
date_column date,
timestamp_column timestamp,
timestamp_ntz timestamp_ntz,
--バイナリー型
binary_column binary,
--複合型
struct_column struct<
struct_string_column :string,
struct_int_column :int
>,
array_column array<string>,
map_column map<string, int>
"""
test_df_02 = spark.createDataFrame(test_data_02, test_schema_02)
assertDataFrameEqual(
test_df_02,
test_df_02,
)
異常系
exp_data_03 = [
Row(
string_column="ABC",
byte_column=1,
integer_column=1,
bigint_column=1,
float_column=12.300000190734863,
double_column=12.3,
numeric_column=Decimal("12"),
boolean_column=True,
date_column=datetime.date(2020, 1, 1),
timestamp_column=datetime.datetime(2021, 1, 1, 0, 0),
timestamp_ntz_column=datetime.datetime(2021, 1, 1, 0, 0),
binary_column=bytearray(b"A"),
struct_column=Row(struct_string_column="AAA", struct_int_column=1),
array_column=["AAA", "BBB", "CCC"],
map_column={"AAA": 1},
)
]
exp_schema_03 = """
--文字型
string_column string,
--整数型
byte_column byte,
integer_column integer,
bigint_column bigint,
--浮動小数点型
float_column float,
double_column double,
numeric_column numeric,
--真偽型
boolean_column boolean,
--日付時刻
date_column date,
timestamp_column timestamp,
timestamp_ntz timestamp_ntz,
--バイナリー型
binary_column binary,
--複合型
struct_column struct<
struct_string_column :string,
struct_int_column :int
>,
array_column array<string>,
map_column map<string, int>
"""
exp_df_03 = spark.createDataFrame(exp_data_03, exp_schema_03)
exp_df_03.display()
assertDataFrameEqual(
test_df_02,
exp_df_03,
)
次のようなエラーが発生しました。
PySparkAssertionError: [DIFFERENT_ROWS] Results do not match: ( 100.00000 % )
*** actual ***
[31m! Row(string_column='AAA', byte_column=1, integer_column=1, bigint_column=1, float_column=12.300000190734863, double_column=12.3, numeric_column=Decimal('12'), boolean_column=True, date_column=datetime.date(2020, 1, 1), timestamp_column=datetime.datetime(2021, 1, 1, 0, 0), timestamp_ntz=datetime.datetime(2021, 1, 1, 0, 0), binary_column=bytearray(b'A'), struct_column=Row(struct_string_column='AAA', struct_int_column=1), array_column=['AAA', 'BBB', 'CCC'], map_column={'AAA': 1})[0m
*** expected ***
[31m! Row(string_column='ABC', byte_column=1, integer_column=1, bigint_column=1, float_column=12.300000190734863, double_column=12.3, numeric_column=Decimal('12'), boolean_column=True, date_column=datetime.date(2020, 1, 1), timestamp_column=datetime.datetime(2021, 1, 1, 0, 0), timestamp_ntz=datetime.datetime(2021, 1, 1, 0, 0), binary_column=bytearray(b'A'), struct_column=Row(struct_string_column='AAA', struct_int_column=1), array_column=['AAA', 'BBB', 'CCC'], map_column={'AAA': 1})[0m
大量データの場合の動作確認
正常系
filepath = "dbfs:/databricks-datasets/tpch/data-001/part/part.tbl"
schema = """
p_partkey long,
p_name string,
p_mfgr string,
p_brand string,
p_type string,
p_size int,
p_container string,
p_retailprice decimal(12, 2),
p_comment string
"""
test_df_04 = spark.read.format("csv").schema(schema).option("sep", "|").load(filepath)
print(test_df_04.count())
assertDataFrameEqual(
test_df_04,
test_df_04,
)
異常系
filepath = "dbfs:/databricks-datasets/tpch/data-001/part/part.tbl"
schema = """
p_partkey long,
p_name string,
p_mfgr string,
p_brand string,
p_type string,
p_size int,
p_container string,
p_retailprice decimal(12, 2),
p_comment string
"""
exp_df_04 = spark.read.format("csv").schema(schema).option("sep", "|").load(filepath)
exp_df_04 = exp_df_04.sample(0.9999)
print(exp_df_04.count())
assertDataFrameEqual(
test_df_04,
exp_df_04,
)
次のようなメッセージが表示され、タイムアウトしてしまい、データの差異があることを確認できませんでした。
TimeoutException: Futures timed out after [30 seconds]
結論
本記事では、PySparkのassertDataFrameEqual
関数の使用方法とその検証結果について説明しました。この関数は、データフレームの等価性を確認するための強力なツールであり、データ型の違いなどに対しても正確に動作します。しかし、パフォーマンスについても考慮する必要があります。この関数を理解し、適切に使用することで、データフレームのテストがより簡単かつ効率的になります。