Optimize conversion between PySpark and pandas DataFrames | Databricks on AWS [2021/6/11時点]の翻訳です。
Apache Arrowは、JVMとPythonプロセス間の転送を効率的に行うために、Apache Sparkで使用されるインメモリ、列指向データフォーマットです。pandasとNumpyデータを取り扱うPython開発者にとってメリットがあります。しかし、この機能は自動化されておらず、完全に機能を利用し、互換性を維持するためには、設定、コードに対して若干の修正が必要となります。
PyArrowのバージョン
PyArrowはDatabricksランタイムにインストールされています。それぞれのDatabricksランタイムバージョンで利用できるPyArrowのバージョンに関しては、Databricks runtime release notesを参照してください。
サポートするSQLタイプ
MapType
、TimestampType
のArrayType
、そしてネストされたStructType
以外のすべてのSpark SQLデータタイプは、Arrowベースの変換でサポートされています。StructType
はpandas.Series
ではなく、pandas.DataFrame
として表現されます。BinaryType
は、0.10.0以降のPyArrowでのみサポートされています。
PySparkデータフレーム、pandasデータフレームの変換
PySparkデータフレームをtoPandas()
でpandasデータフレームに変換する際、createDataFrame(pandas_df)
を用いてpandasデータフレームからPySparkデータフレームを作成する際の最適化テクニックとしてArrowを利用することができます。これらのメソッドにおいてArrowを使用するためには、Spark configurationのspark.sql.execution.arrow.enabled
をtrue
に設定します。この設定はデフォルトでは無効化されています。
さらに、spark.sql.execution.arrow.enabled
による最適化は、Sparkで計算の前にエラーが発生した場合、非Arrow実装にフォールバックする場合があります。Spark設定spark.sql.execution.arrow.fallback.enabled
によって、この振る舞いを制御することができます。
サンプル
import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# Generate a pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))
# Create a Spark DataFrame from a pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
# Convert the Spark DataFrame back to a pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()
Arrowの最適化を用いた場合でも、Arrowを有効化していない場合と同じ結果を得ることができます。Arrowを用いた場合でも、toPandas()
はデータフレームにおける全てのレコードのコレクションをドライバープログラムに返却し、小規模のサブセットのデータに対して処理が行われます。
また、全てのSparkデータタイプがサポートされているわけではなく、サポートされていないタイプのカラムがある場合にはエラーとなります。createDataFrame()
でエラーが発生した場合、SparkはArrowを使用せずにデータフレームを作成します。