2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks (Spark) にて Spark dataframe と Pandas 変換時の PyArrow が利用されないことによるエラー事象への対応方法

Last updated at Posted at 2024-03-10

概要

Databricks (Spark) にて Spark dataframe と Pandas 変換時の PyArrow が利用されないことによるエラーに遭遇したため、事象の再現方法と対応方法を共有します。

Spark dataframe と Pandas 変換時の PyArrow が利用されない場合(spark.sql.execution.arrow.pyspark.enabledfalseの場合)には、ドライバーノードとワーカーノードでデータの通信が行われ、spark.rpc.message.maxSizeの上限を超えることで次のエラーが発生します。メッセージではspark.rpc.message.maxSizeの上限を上げることが提案されていますが、その上限を上げる対応方法を実施しませんでした。

org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 29:0 was 277567143 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

Databricks では、基本的にはspark.sql.execution.arrow.pyspark.enabledの設定値がデフォルトでtrueとなっておりますが、特定の状況下では次のような警告とともにspark.sql.execution.arrow.pyspark.enabledの設定値がfalseとなります。

/databricks/spark/python/pyspark/sql/pandas/conversion.py:124: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
  A null type field may not be non-nullable
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)

Array 型のカラムを追加する際にarrayメソッドをそのまま指定している場合に、上記の警告が表示されることがわかりました。その対応方法として、明示的に CAST する方法で警告が表示されなくなりました。

from pyspark.sql.functions import col, array

df = spark.createDataFrame([(1,)], 'col_1 long')
df = df.withColumn('array_column', array())  ## lit([]) でも同様の結果となる

pdf = df.toPandas()

image.png

from pyspark.sql.functions import col, array

df = spark.createDataFrame([(1,)], 'col_1 long')
df = df.withColumn('array_column', array().cast('array<void>'))
pdf = df.toPandas()

image.png

本記事では、調査手順、エラーの再現方法、及び、その対応方法を共有します。

調査手順

spark のレポジトリーをベースに Github Copilot Chat で本件について質問したところ、次のような回答が返ってきました。私が遭遇したケースでは、PySparkとArrowの間に互換性の問題があったのではないかと仮説をたてました。

下記の警告がArrow最適化が何らかの理由により表示されるケースを箇条書きで教えて。日本語で回答して

/databricks/spark/python/pyspark/sql/pandas/conversion.py:124: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true;
spark.sql.execution.arrow.pyspark.enabledがtrueに設定されているため、Arrow最適化が試みられ、以下のようなケースで警告が表示される可能性があります:

- Arrowがインストールされていない場合。
- PySparkとArrowの間に互換性の問題がある場合。
- データ形式がArrowによる最適化をサポートしていない場合。
- Arrowの最適化がメモリ制限を超える可能性がある大規模なデータフレームに対して使用された場合。

具体的な警告メッセージは問題の詳細によりますが、上記のいずれかのシナリオが原因である可能性が高いです。

image.png

続けて、エラーに関するコードを Github Copilot Chat で提示してもらいました。リンク先のコードを確認したところ、to_arrow_schemaメソッドがデータ型の変換を実施していることがわかりました。

そのエラーに関係するコードを提示して

image.png

image.png

引用元:spark/python/pyspark/sql/pandas/conversion.py at 76b1c122cb7d77e8f175b25b935b9296a669d5d8 · apache/spark (github.com)

to_arrow_schemaメソッド内で利用されているto_arrow_typeにおける ArrayType の変換に関するコードを確認しました。

image.png

引用元:spark/python/pyspark/sql/pandas/types.py at 76b1c122cb7d77e8f175b25b935b9296a669d5d8 · apache/spark (github.com)

PyArrow 変換時のエラーが発生するケースとして、次のようなコードとなる場合(NullTypeかつnullableFalse)であることを特定できました。A null type field may not be non-nullableというspark.sql.execution.arrow.pyspark.enabledfalseとなった際の警告文内に記載されている一部のメッセージと一致しました。nullableTrueにするためには、カラム追加時に CAST すればよいことが、後述する対応方法を把握することができました。

import pyarrow as pa

print(
    pa.field(
        "element",
        pa.null(),
        False,
    )
)
ValueError: A null type field may not be non-nullable

image.png

事前準備

テストスキーマとテーブルの準備

%sql
CREATE SCHEMA if NOT EXISTS hive_metastore.oom_test;
CREATE OR REPLACE TABLE hive_metastore.oom_test.partsupp (
  ps_partkey long,
  ps_suppkey long,
  ps_availqty int,
  ps_supplycost float,
  ps_comment string
);
filepath = "dbfs:/databricks-datasets/tpch/data-001/partsupp/partsupp.tbl"
tgt_tbl = "hive_metastore.oom_test.partsupp"


schema = """
ps_partkey long,
ps_suppkey long,
ps_availqty int,
ps_supplycost float,
ps_comment string
"""

df = (spark
       .read
       .format("csv")
       .schema(schema)
       .option("sep", "|")
       .load(filepath)
    )

df.write.mode("overwrite").saveAsTable(tgt_tbl)

image.png

spark.sql.execution.arrow.pyspark.enabledfalseの場合にエラーになることを確認

spark.sql.execution.arrow.pyspark.enabledTrueの場合の動作確認

正常終了しました。

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
print(spark.conf.get("spark.sql.execution.arrow.pyspark.enabled"))

tgt_tbl = "hive_metastore.oom_test.partsupp"

df = spark.table(tgt_tbl)
pdf = df.toPandas()

new_col = f"ps_comment_1"
pdf[new_col] = pdf['ps_comment'] + f"_1"

df_from_pandas = spark.createDataFrame(pdf)
print(df_from_pandas.count())

image.png

spark.sql.execution.arrow.pyspark.enabledFalseの場合の動作確認

エラーが発生しました。

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
print(spark.conf.get("spark.sql.execution.arrow.pyspark.enabled"))

tgt_tbl = "hive_metastore.oom_test.partsupp"

df = spark.table(tgt_tbl)
pdf = df.toPandas()

new_col = f"ps_comment_1"
pdf[new_col] = pdf['ps_comment'] + f"_1"

df_from_pandas = spark.createDataFrame(pdf)
print(df_from_pandas.count())
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 29:0 was 277567143 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

image.png

spark.sql.execution.arrow.pyspark.enabledFalse、かつ、Pandas のデータサイズが大きくなる処理を実行する場合

Spark ドライバーが異常終了します。

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
print(spark.conf.get("spark.sql.execution.arrow.pyspark.enabled"))

tgt_tbl = "hive_metastore.oom_test.partsupp"

df = spark.table(tgt_tbl)
pdf = df.toPandas()

for i in range(1, 6):
    new_col = f"ps_comment_{i}"
    pdf[new_col] = pdf['ps_comment'] + f"_{i}"

df_from_pandas = spark.createDataFrame(pdf)
print(df_from_pandas.count())
The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.

image.png

spark.sql.execution.arrow.pyspark.enabledfalseとなるケースと対応方法

spark.sql.execution.arrow.pyspark.enabledfalseとなるケース

前述の警告が表示されます。

from pyspark.sql.functions import col, array

df = spark.createDataFrame([(1,)], 'col_1 long')
df = df.withColumn('array_column', array())  ## lit([]) でも同様の結果となる

pdf = df.toPandas()
/databricks/spark/python/pyspark/sql/pandas/conversion.py:124: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
  A null type field may not be non-nullable
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)

image.png

上記コードの結果に基づき PyArrow への変換を実施すると、エラーとなりました。

print(f"fields:       {df.schema.fields}")
print(f"dataType:     {df.schema.fields[1].dataType.elementType}")
print(f"containsNull: {df.schema.fields[1].dataType.containsNull}")


import pyarrow as pa

print(
    pa.field(
        "element",
        pa.null(),  # NullType
        df.schema.fields[1].dataType.containsNull,
    )
)
ValueError: A null type field may not be non-nullable

image.png

データ型を明示することによる対応方法

CAST によりデータ変換を実施することで、警告が表示されなくなりました。

from pyspark.sql.functions import col, array

df = spark.createDataFrame([(1,)], 'col_1 long')
df = df.withColumn('array_column', array().cast('array<void>'))
pdf = df.toPandas()

image.png

上記コードの結果に基づき PyArrow への変換を実施すると、想定通りに変換できました。

print(f"fields:       {df.schema.fields}")
print(f"dataType:     {df.schema.fields[1].dataType.elementType}")
print(f"containsNull: {df.schema.fields[1].dataType.containsNull}")
import pyarrow as pa

print(
    pa.field(
        "element",
        pa.null(),  # NullType
        df.schema.fields[1].dataType.containsNull,
    )
)

image.png

まとめ

この記事は、Databricks (Spark) で Spark dataframe と Pandas の変換時に PyArrow が利用されないことによるエラー事象について説明しました。このエラーは、spark.sql.execution.arrow.pyspark.enabledがfalseの場合に発生します。この設定がfalseになると、ドライバーノードとワーカーノード間でデータの通信が行われ、その結果、spark.rpc.message.maxSizeの上限を超える可能性があります。

記事では、この問題の再現方法と対応策を詳しく説明しています。具体的には、Array 型のカラムを追加する際にarrayメソッドをそのまま指定すると、上記の警告が表示されることが確認されました。これを解決するための対応策として、カラムを追加する際にデータ型を明示的に CAST する方法が提案しています。

2
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?