はじめに
PySparkのDataFrameの縦結合について、意外に知られていない点を備忘としてまとめる。
なお、記事の内容は、Spark 2.4に基づく。
PySparkの縦結合
縦結合系メソッドの違いについて
DataFrameの縦結合系のメソッドは3種類存在する。
- union
- v2.0より
- unionAll
- v1.3より
- unionByName
- v2.3より
unionとunionAllの違い
uniontとunionAllの2メソッドの間には、実は帰納的な違いがなく、どちらもシンプルに2つのDataFrameを縦結合する、
SQLをイメージしていると、unionでは重複制御が行われると勘違いしがちだが、どちらの場合も重複制御が行われない。
そのため、重複制御が必要な場合は、縦結合の後、dinstinctメソッドを使用する必要がある。
なお、v2.0以上では、unionの使用が推奨されている。
df1 = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ['x', 'y', 'z'])
df2 = spark.createDataFrame([(4, 5, 6), (7, 8, 9)], ['x', 'y', 'z'])
df_union = df1.union(df2)
df_unionAll = df1.unionAll(df2)
print('df1')
df1.show()
print('df2')
df2.show()
# df1
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# +---+---+---+
#
# df2
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 4| 5| 6|
# | 7| 8| 9|
# +---+---+---+
print('union')
df_union.show()
print('unionAll')
df_unionAll.show()
# union
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# | 4| 5| 6|
# | 7| 8| 9|
# +---+---+---+
#
# unionAll
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# | 4| 5| 6|
# | 7| 8| 9|
# +---+---+---+
unionとunionByNameの違い
unionとunionByNameの違いは、縦結合時にDataFrameの列名を参照するかにある。
unionは、2つのDataFrameの1番目の列同士を結合、2番目の列同士を結合・・・のように、DataFrame内の列の並びを加味し結合を行う。つまり、unionの場合、結合時に列名を見ていない。同じ列をもつDataFrameであっても、並び順が異なる場合は、メソッドの呼び出し元のDataFrameの列名を基に結合され、同じ列のもの同士で結合されない。一方、unionByNameは、それぞれのDataFrameの列名を参照し、同じ列名同士を結合する。
そのため、結合したい2つのDataFrameのスキーマが統一されているのであれば、unionByNameを使うのが無難である。
df1 = spark.createDataFrame([(1, 2, 3)], ['x', 'y', 'z'])
df2 = spark.createDataFrame([(4, 5, 6)], ['z', 'x', 'y'])
df_union = df1.union(df2)
df_unionByName = df1.unionByName(df2)
print('df1')
df1.show()
print('df2')
df2.show()
# df1
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# +---+---+---+
#
# df2
# +---+---+---+
# | z| x| y|
# +---+---+---+
# | 4| 5| 6|
# +---+---+---+
print('union')
df_union.show()
print('unionByName')
df_unionByName.show()
# union
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# +---+---+---+
#
# unionByName
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 5| 6| 4|
# +---+---+---+
結合時のCastについて
DataFrameの縦結合時に、結合する列の型が異なる際、暗黙的にCastされることがある。
(されずにエラーになるパターンもある)
暗黙的なCastの中でも、特に、Integerなどの数値型がStringに変換されるパターンが厄介で、これはフラグ値の取り扱いの違いなどで十分起こりうるので注意が必要である。
ここでは、unionでの例を掲載しているが、unionByNameでも同様である(と経験的に認識している)。
from pyspark.sql.functions import col
from pyspark.sql.types import *
df = spark.createDataFrame([(1, 'x', True)], ['long', 'str', 'bool']).withColumn('int', col('long').cast('int'))
df.show()
df.printSchema()
# +------+---+----+---+
# |bigint|str|bool|int|
# +------+---+----+---+
# | 1| x|true| 1|
# +------+---+----+---+
# root
# |-- bigint: long (nullable = true)
# |-- str: string (nullable = true)
# |-- bool: boolean (nullable = true)
# |-- int: integer (nullable = true)
df.select('int').union(df.select('str')).printSchema()
# root
# |-- int: string (nullable = true)
df.select('int').union(df.select('long')).printSchema()
# root
# |-- int: long (nullable = true)
# これはエラーになる
# df.select('bool').union(df.select('str'))
複数DataFrameの縦結合について
縦結合系のメソッドは、DataFrameを2つ結合することにしか対応できない。
結合したいDataFrameが3つ以上ある場合、以下のように処理を行うとよい。
from functools import reduce
from pyspark.sql import DataFrame
df1 = spark.createDataFrame([(1, 2, 3)], ['x', 'y', 'z'])
df2 = spark.createDataFrame([(4, 5, 6)], ['x', 'y', 'z'])
df3 = spark.createDataFrame([(7, 8, 9)], ['x', 'y', 'z'])
df = reduce(DataFrame.unionByName, [df1, df2, df3])
df.show()
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# | 7| 8| 9|
# +---+---+---+