Pandas を利用して作ったロジックを PySpark を使う処理系(たとえば Databricks)に持っていく場合などに、それぞれのDataFrameを変換することがありますが、その際に気をつけること共有します。
この記事の例は Databricks で実行することを想定しており、spark
はプリセットの SparkSession
オブジェクトだと解釈してください。
PySpark → Pandas
pyspark.sql.DataFrame
オブジェクトには toPandas()
というメソッドがあるため、これを使えば変換できます。
import io
data = '''id,value
one,hoge
two,fuga
'''
df = spark.read.csv(io.StringIO(data)) # df は pyspark.sql.DataFrame
pdf = df.toPandas() # pdf は pandas.DataFrame
Pandas → PySpark
SparkSession
オブジェクトには createDataFrame
というメソッドがあるため、これを使うと pandas.DataFrame
を pyspark.sql.DataFrame
に変換できます。
import pandas as pd
pdf = pd.read_csv(io.StringIO(data)) # pdf は pandas.DataFrame
df = spark.createDataFrame(pdf)
ただし、 pandas.DataFrame
の Index としてデフォルトの値(1から始まる正数列)以外を使っている場合は、 変換の際に Index の情報が失われてしまうので注意が必要です。
例えば
import pandas as pd
pdf = pd.read_csv(io.StringIO(data), index_col='id') # pdf は pandas.DataFrame
df = spark.createDataFrame(pdf)
とすると、CSVの id
列に記載されていた情報が失われます。
これを防止したいのであれば、pandas.DataFrame.reset_index()
を利用し、前述の createDataFrame
の部分を以下に差し替えると Index に入っていた情報を取り込むことができます。
df = spark.createDataFrame(pdf.reset_index(drop=False))