概要
Databricks (Spark) にて Spark dataframe と Pandas 変換時の PyArrow が利用されないことによるエラーに遭遇したため、事象の再現方法と対応方法を共有します。
Spark dataframe と Pandas 変換時の PyArrow が利用されない場合(spark.sql.execution.arrow.pyspark.enabled
がfalse
の場合)には、ドライバーノードとワーカーノードでデータの通信が行われ、spark.rpc.message.maxSize
の上限を超えることで次のエラーが発生します。メッセージではspark.rpc.message.maxSize
の上限を上げることが提案されていますが、その上限を上げる対応方法を実施しませんでした。
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 29:0 was 277567143 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
Databricks では、基本的にはspark.sql.execution.arrow.pyspark.enabled
の設定値がデフォルトでtrue
となっておりますが、特定の状況下では次のような警告とともにspark.sql.execution.arrow.pyspark.enabled
の設定値がfalse
となります。
/databricks/spark/python/pyspark/sql/pandas/conversion.py:124: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
A null type field may not be non-nullable
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
warn(msg)
Array 型のカラムを追加する際にarray
メソッドをそのまま指定している場合に、上記の警告が表示されることがわかりました。その対応方法として、明示的に CAST する方法で警告が表示されなくなりました。
from pyspark.sql.functions import col, array
df = spark.createDataFrame([(1,)], 'col_1 long')
df = df.withColumn('array_column', array()) ## lit([]) でも同様の結果となる
pdf = df.toPandas()
from pyspark.sql.functions import col, array
df = spark.createDataFrame([(1,)], 'col_1 long')
df = df.withColumn('array_column', array().cast('array<void>'))
pdf = df.toPandas()
本記事では、調査手順、エラーの再現方法、及び、その対応方法を共有します。
調査手順
spark のレポジトリーをベースに Github Copilot Chat で本件について質問したところ、次のような回答が返ってきました。私が遭遇したケースでは、PySparkとArrowの間に互換性の問題があったのではないかと仮説をたてました。
下記の警告がArrow最適化が何らかの理由により表示されるケースを箇条書きで教えて。日本語で回答して
/databricks/spark/python/pyspark/sql/pandas/conversion.py:124: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true;
spark.sql.execution.arrow.pyspark.enabledがtrueに設定されているため、Arrow最適化が試みられ、以下のようなケースで警告が表示される可能性があります:
- Arrowがインストールされていない場合。
- PySparkとArrowの間に互換性の問題がある場合。
- データ形式がArrowによる最適化をサポートしていない場合。
- Arrowの最適化がメモリ制限を超える可能性がある大規模なデータフレームに対して使用された場合。
具体的な警告メッセージは問題の詳細によりますが、上記のいずれかのシナリオが原因である可能性が高いです。
続けて、エラーに関するコードを Github Copilot Chat で提示してもらいました。リンク先のコードを確認したところ、to_arrow_schema
メソッドがデータ型の変換を実施していることがわかりました。
そのエラーに関係するコードを提示して
to_arrow_schema
メソッド内で利用されているto_arrow_type
における ArrayType の変換に関するコードを確認しました。
PyArrow 変換時のエラーが発生するケースとして、次のようなコードとなる場合(NullType
かつnullable
がFalse
)であることを特定できました。A null type field may not be non-nullable
というspark.sql.execution.arrow.pyspark.enabled
がfalse
となった際の警告文内に記載されている一部のメッセージと一致しました。nullable
をTrue
にするためには、カラム追加時に CAST すればよいことが、後述する対応方法を把握することができました。
import pyarrow as pa
print(
pa.field(
"element",
pa.null(),
False,
)
)
ValueError: A null type field may not be non-nullable
事前準備
テストスキーマとテーブルの準備
%sql
CREATE SCHEMA if NOT EXISTS hive_metastore.oom_test;
CREATE OR REPLACE TABLE hive_metastore.oom_test.partsupp (
ps_partkey long,
ps_suppkey long,
ps_availqty int,
ps_supplycost float,
ps_comment string
);
filepath = "dbfs:/databricks-datasets/tpch/data-001/partsupp/partsupp.tbl"
tgt_tbl = "hive_metastore.oom_test.partsupp"
schema = """
ps_partkey long,
ps_suppkey long,
ps_availqty int,
ps_supplycost float,
ps_comment string
"""
df = (spark
.read
.format("csv")
.schema(schema)
.option("sep", "|")
.load(filepath)
)
df.write.mode("overwrite").saveAsTable(tgt_tbl)
spark.sql.execution.arrow.pyspark.enabled
がfalse
の場合にエラーになることを確認
spark.sql.execution.arrow.pyspark.enabled
がTrue
の場合の動作確認
正常終了しました。
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
print(spark.conf.get("spark.sql.execution.arrow.pyspark.enabled"))
tgt_tbl = "hive_metastore.oom_test.partsupp"
df = spark.table(tgt_tbl)
pdf = df.toPandas()
new_col = f"ps_comment_1"
pdf[new_col] = pdf['ps_comment'] + f"_1"
df_from_pandas = spark.createDataFrame(pdf)
print(df_from_pandas.count())
spark.sql.execution.arrow.pyspark.enabled
がFalse
の場合の動作確認
エラーが発生しました。
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
print(spark.conf.get("spark.sql.execution.arrow.pyspark.enabled"))
tgt_tbl = "hive_metastore.oom_test.partsupp"
df = spark.table(tgt_tbl)
pdf = df.toPandas()
new_col = f"ps_comment_1"
pdf[new_col] = pdf['ps_comment'] + f"_1"
df_from_pandas = spark.createDataFrame(pdf)
print(df_from_pandas.count())
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 29:0 was 277567143 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
spark.sql.execution.arrow.pyspark.enabled
がFalse
、かつ、Pandas のデータサイズが大きくなる処理を実行する場合
Spark ドライバーが異常終了します。
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
print(spark.conf.get("spark.sql.execution.arrow.pyspark.enabled"))
tgt_tbl = "hive_metastore.oom_test.partsupp"
df = spark.table(tgt_tbl)
pdf = df.toPandas()
for i in range(1, 6):
new_col = f"ps_comment_{i}"
pdf[new_col] = pdf['ps_comment'] + f"_{i}"
df_from_pandas = spark.createDataFrame(pdf)
print(df_from_pandas.count())
The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.
spark.sql.execution.arrow.pyspark.enabled
がfalse
となるケースと対応方法
spark.sql.execution.arrow.pyspark.enabled
がfalse
となるケース
前述の警告が表示されます。
from pyspark.sql.functions import col, array
df = spark.createDataFrame([(1,)], 'col_1 long')
df = df.withColumn('array_column', array()) ## lit([]) でも同様の結果となる
pdf = df.toPandas()
/databricks/spark/python/pyspark/sql/pandas/conversion.py:124: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
A null type field may not be non-nullable
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
warn(msg)
上記コードの結果に基づき PyArrow への変換を実施すると、エラーとなりました。
print(f"fields: {df.schema.fields}")
print(f"dataType: {df.schema.fields[1].dataType.elementType}")
print(f"containsNull: {df.schema.fields[1].dataType.containsNull}")
import pyarrow as pa
print(
pa.field(
"element",
pa.null(), # NullType
df.schema.fields[1].dataType.containsNull,
)
)
ValueError: A null type field may not be non-nullable
データ型を明示することによる対応方法
CAST によりデータ変換を実施することで、警告が表示されなくなりました。
from pyspark.sql.functions import col, array
df = spark.createDataFrame([(1,)], 'col_1 long')
df = df.withColumn('array_column', array().cast('array<void>'))
pdf = df.toPandas()
上記コードの結果に基づき PyArrow への変換を実施すると、想定通りに変換できました。
print(f"fields: {df.schema.fields}")
print(f"dataType: {df.schema.fields[1].dataType.elementType}")
print(f"containsNull: {df.schema.fields[1].dataType.containsNull}")
import pyarrow as pa
print(
pa.field(
"element",
pa.null(), # NullType
df.schema.fields[1].dataType.containsNull,
)
)
まとめ
この記事は、Databricks (Spark) で Spark dataframe と Pandas の変換時に PyArrow が利用されないことによるエラー事象について説明しました。このエラーは、spark.sql.execution.arrow.pyspark.enabledがfalseの場合に発生します。この設定がfalseになると、ドライバーノードとワーカーノード間でデータの通信が行われ、その結果、spark.rpc.message.maxSizeの上限を超える可能性があります。
記事では、この問題の再現方法と対応策を詳しく説明しています。具体的には、Array 型のカラムを追加する際にarrayメソッドをそのまま指定すると、上記の警告が表示されることが確認されました。これを解決するための対応策として、カラムを追加する際にデータ型を明示的に CAST する方法が提案しています。