2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PySpark の Testing 機能の1つである assertDataFrameEqual に対する基本的な検証結果

Last updated at Posted at 2024-01-09

概要

この記事では、Spark 3.5 にて実装された PySpark の Testing 機能の1つである assertDataFrameEqual についての検証結果を共有します。assertDataFrameEqualはPySparkのテストユーティリティの一部で、2つのデータフレームが等しいかどうかを検証するためのメソッドです。検証したケースのコードを記事内で後述しています。

検証では、正常系(データに差異がない場合)の動作確認やデータに差異がある場合の動作確認などのケースを実施しました。しかし、大量データでの検証ではタイムアウトの問題が発生し、テーブル間でのデータ差異の確認には利用できなさそうでした。ただ、まだリリースされて間もないため、今後改善される可能性があります。

次の記事を参考にしました。

事前準備

ライブラリのimport

import datetime

from pyspark.sql import Row
from decimal import Decimal

from pyspark.testing.utils import assertDataFrameEqual, assertSchemaEqual

image.png

メソッドの基本的な動作確認

正常系の動作確認

test_schema_01 = """
id int,
name string,
weight float,
birth_date date
"""
test_data_01 = [
    Row(id=1, name="user_aaa", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
    Row(id=2, name="user_bbb", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
    Row(id=3, name="user_ccc", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
]
test_df_01 = spark.createDataFrame(test_data_01, test_schema_01)
test_df_01.display()
assertDataFrameEqual(
    test_df_01,
    test_df_01,
)

image.png

データに差異がある場合の動作確認

exp_schema_01 = """
id int,
name string,
weight float,
birth_date date
"""
exp_data_01 = [
    Row(id=4, name="user_aaa", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
    Row(id=2, name="user_bbb", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
    Row(id=3, name="user_ccc", weight=53.5, birth_date=datetime.date(2020, 1, 1)),
]
exp_df_01 = spark.createDataFrame(exp_data_01, exp_schema_01)
exp_df_01.display()
assertDataFrameEqual(
    test_df_01,
    exp_df_01,
)

image.png

次のようなエラーが発生しました。


assertDataFrameEqual(
    test_df_01,
    exp_df_01,
)
PySparkAssertionError: [DIFFERENT_ROWS] Results do not match: ( 100.00000 % )
*** actual ***
[31m- Row(id=1, name='user_aaa', weight=53.5, birth_date=datetime.date(2020, 1, 1))[0m
  Row(id=2, name='user_bbb', weight=53.5, birth_date=datetime.date(2020, 1, 1))
  Row(id=3, name='user_ccc', weight=53.5, birth_date=datetime.date(2020, 1, 1))


*** expected ***
  Row(id=2, name='user_bbb', weight=53.5, birth_date=datetime.date(2020, 1, 1))
  Row(id=3, name='user_ccc', weight=53.5, birth_date=datetime.date(2020, 1, 1))
[31m+ Row(id=4, name='user_aaa', weight=53.5, birth_date=datetime.date(2020, 1, 1))[0m

スキーマに差異がある場合の動作確認

exp_schema_02 = """
id int
"""
exp_data_02 = [
    Row(id=1),
]
exp_df_02 = spark.createDataFrame(exp_data_02, exp_schema_02)
exp_df_02.display()
assertDataFrameEqual(
    test_df_01,
    exp_df_02,
)

image.png

次のようなエラーが発生しました。

assertDataFrameEqual(

PySpark の Testing 機能にはassertSchemaEqualというメソッドもあり、同様のエラーが発生しました。

# assertSchemaEqual との比較
assertSchemaEqual(
    test_df_01.schema,
    exp_df_02.schema,
)

image.png

--- actual
+++ expected
- StructType([StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('weight', FloatType(), True), StructField('birth_date', DateType(), True)])
+ StructType([StructField('id', IntegerType(), True)])

エラーハンドリング

try/exceptブロックによりエラーを取得したい場合には、次のようなコードにより可能です。

from pyspark.testing.utils import PySparkAssertionError

try:
    assertDataFrameEqual(test_df_01, exp_df_01)
except PySparkAssertionError:
    print("エラーです")

image.png

データ型の対応可否確認

正常系

test_data_02 = [
    Row(
        string_column="AAA",
        byte_column=1,
        integer_column=1,
        bigint_column=1,
        float_column=12.300000190734863,
        double_column=12.3,
        numeric_column=Decimal("12"),
        boolean_column=True,
        date_column=datetime.date(2020, 1, 1),
        timestamp_column=datetime.datetime(2021, 1, 1, 0, 0),
        timestamp_ntz_column=datetime.datetime(2021, 1, 1, 0, 0),
        binary_column=bytearray(b"A"),
        struct_column=Row(struct_string_column="AAA", struct_int_column=1),
        array_column=["AAA", "BBB", "CCC"],
        map_column={"AAA": 1},
    )
]

test_schema_02 = """
--文字型
string_column string,

--整数型
byte_column byte,
integer_column integer,
bigint_column bigint,

--浮動小数点型
float_column float,
double_column double,
numeric_column numeric,

--真偽型
boolean_column boolean,

--日付時刻
date_column date,
timestamp_column timestamp,
timestamp_ntz timestamp_ntz,

--バイナリー型
binary_column binary,

--複合型
struct_column struct<
    struct_string_column :string,
    struct_int_column    :int
>,
array_column array<string>,
map_column map<string, int>
"""

test_df_02 = spark.createDataFrame(test_data_02, test_schema_02)
assertDataFrameEqual(
    test_df_02,
    test_df_02,
)

image.png

異常系

exp_data_03 = [
    Row(
        string_column="ABC",
        byte_column=1,
        integer_column=1,
        bigint_column=1,
        float_column=12.300000190734863,
        double_column=12.3,
        numeric_column=Decimal("12"),
        boolean_column=True,
        date_column=datetime.date(2020, 1, 1),
        timestamp_column=datetime.datetime(2021, 1, 1, 0, 0),
        timestamp_ntz_column=datetime.datetime(2021, 1, 1, 0, 0),
        binary_column=bytearray(b"A"),
        struct_column=Row(struct_string_column="AAA", struct_int_column=1),
        array_column=["AAA", "BBB", "CCC"],
        map_column={"AAA": 1},
    )
]

exp_schema_03 = """
--文字型
string_column string,

--整数型
byte_column byte,
integer_column integer,
bigint_column bigint,

--浮動小数点型
float_column float,
double_column double,
numeric_column numeric,

--真偽型
boolean_column boolean,

--日付時刻
date_column date,
timestamp_column timestamp,
timestamp_ntz timestamp_ntz,

--バイナリー型
binary_column binary,

--複合型
struct_column struct<
    struct_string_column :string,
    struct_int_column    :int
>,
array_column array<string>,
map_column map<string, int>
"""

exp_df_03 = spark.createDataFrame(exp_data_03, exp_schema_03)
exp_df_03.display()
assertDataFrameEqual(
    test_df_02,
    exp_df_03,
)

image.png

次のようなエラーが発生しました。

PySparkAssertionError: [DIFFERENT_ROWS] Results do not match: ( 100.00000 % )
*** actual ***
[31m! Row(string_column='AAA', byte_column=1, integer_column=1, bigint_column=1, float_column=12.300000190734863, double_column=12.3, numeric_column=Decimal('12'), boolean_column=True, date_column=datetime.date(2020, 1, 1), timestamp_column=datetime.datetime(2021, 1, 1, 0, 0), timestamp_ntz=datetime.datetime(2021, 1, 1, 0, 0), binary_column=bytearray(b'A'), struct_column=Row(struct_string_column='AAA', struct_int_column=1), array_column=['AAA', 'BBB', 'CCC'], map_column={'AAA': 1})[0m


*** expected ***
[31m! Row(string_column='ABC', byte_column=1, integer_column=1, bigint_column=1, float_column=12.300000190734863, double_column=12.3, numeric_column=Decimal('12'), boolean_column=True, date_column=datetime.date(2020, 1, 1), timestamp_column=datetime.datetime(2021, 1, 1, 0, 0), timestamp_ntz=datetime.datetime(2021, 1, 1, 0, 0), binary_column=bytearray(b'A'), struct_column=Row(struct_string_column='AAA', struct_int_column=1), array_column=['AAA', 'BBB', 'CCC'], map_column={'AAA': 1})[0m

大量データの場合の動作確認

正常系

filepath = "dbfs:/databricks-datasets/tpch/data-001/part/part.tbl"

schema = """
  p_partkey long,
  p_name string,
  p_mfgr string,
  p_brand string,
  p_type string,
  p_size int,
  p_container string,
  p_retailprice decimal(12, 2),
  p_comment string
"""

test_df_04 = spark.read.format("csv").schema(schema).option("sep", "|").load(filepath)
print(test_df_04.count())
assertDataFrameEqual(
    test_df_04,
    test_df_04,
)

image.png

異常系

filepath = "dbfs:/databricks-datasets/tpch/data-001/part/part.tbl"

schema = """
  p_partkey long,
  p_name string,
  p_mfgr string,
  p_brand string,
  p_type string,
  p_size int,
  p_container string,
  p_retailprice decimal(12, 2),
  p_comment string
"""

exp_df_04 = spark.read.format("csv").schema(schema).option("sep", "|").load(filepath)
exp_df_04 = exp_df_04.sample(0.9999)
print(exp_df_04.count())
assertDataFrameEqual(
    test_df_04,
    exp_df_04,
)

image.png

次のようなメッセージが表示され、タイムアウトしてしまい、データの差異があることを確認できませんでした。

TimeoutException: Futures timed out after [30 seconds]

結論

本記事では、PySparkのassertDataFrameEqual関数の使用方法とその検証結果について説明しました。この関数は、データフレームの等価性を確認するための強力なツールであり、データ型の違いなどに対しても正確に動作します。しかし、パフォーマンスについても考慮する必要があります。この関数を理解し、適切に使用することで、データフレームのテストがより簡単かつ効率的になります。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?