動機
dataframeをunionするとき、カラムのスキーマが一致していないとできない。あとからテーブルにカラムが追加されてしまうと、新しいテーブルと古いテーブルをunionできなくなってしまう。
解決策
古いテーブルには強制的にnullのカラムを追加する。
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
data = [("Tom",21,175.3),("Mary",30,165.3)]
schema = T.StructType([
T.StructField("name", T.StringType(), True),
T.StructField("age", T.IntegerType(), True),
T.StructField("hight", T.DoubleType(), True)
])
df = spark.createDataFrame(data,schema)
df.show()
+----+---+-----+
|name|age|hight|
+----+---+-----+
| Tom| 21|175.3|
|Mary| 30|165.3|
+----+---+-----+
data2 = [("Bob",23,165.3,60.2),("Jack",30,165.3,82.4),("Judy",22,172.3,63.4)]
schema2 = T.StructType([
T.StructField("name", T.StringType(), True),
T.StructField("age", T.IntegerType(), True),
T.StructField("hight", T.DoubleType(), True),
T.StructField("weight", T.DoubleType(), True)
])
df2 = spark.createDataFrame(data2,schema2)
df2.show()
+----+---+-----+------+
|name|age|hight|weight|
+----+---+-----+------+
| Bob| 23|165.3| 60.2|
|Jack| 30|165.3| 82.4|
|Judy| 22|172.3| 63.4|
+----+---+-----+------+
df2.union(df)#エラー!!
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
~/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
~/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
319 "An error occurred while calling {0}{1}{2}.\n".
--> 320 format(target_id, ".", name), value)
321 else:
Py4JJavaError: An error occurred while calling o117.union.
: org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 4 columns and the second table has 3 columns;;
'Union
:- AnalysisBarrier
: +- LogicalRDD [name#107, age#108, hight#109, weight#110], false
+- AnalysisBarrier
+- LogicalRDD [name#101, age#102, hight#103], false
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11.apply(CheckAnalysis.scala:245)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11.apply(CheckAnalysis.scala:242)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:242)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:104)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.Dataset.withSetOperator(Dataset.scala:3307)
at org.apache.spark.sql.Dataset.union(Dataset.scala:1829)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
AnalysisException Traceback (most recent call last)
<ipython-input-50-eb00d5821249> in <module>()
----> 1 df2.union(df)#エラー!!
~/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/dataframe.py in union(self, other)
1336 Also as standard in SQL, this function resolves columns by position (not by name).
1337 """
-> 1338 return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)
1339
1340 @since(1.3)
~/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in __call__(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
1162 for temp_arg in temp_args:
~/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: "Union can only be performed on tables with the same number of columns, but the first table has 4 columns and the second table has 3 columns;;\n'Union\n:- AnalysisBarrier\n: +- LogicalRDD [name#107, age#108, hight#109, weight#110], false\n+- AnalysisBarrier\n +- LogicalRDD [name#101, age#102, hight#103], false\n"
def superunionall(newdf, olddf):
"""newdf と olddf をunionallする。
olddfに無くてnewdfにあるカラムはolddfにnullのカラムを追加した後、unionされる。
olddfにあってnewdfにないカラムはolddfからカラムを削除した後、unionされる。
Args:
newdf (DataFrame): 基準となるdf
olddf (DataFrame): newdfのカラムの形に変形されてからnewdfにunionallさせる。
Returns:
DataFrame: newdfとolddfをunionallしたDataFrame
"""
diff = set(newdf.dtypes) - set(olddf.dtypes)
for i in diff:
olddf = olddf.withColumn(i[0], F.expr(" cast (null as {})".format(i[1]) ) )
newdfcolname = [ i[0] for i in newdf.dtypes]
olddf = olddf.select(*newdfcolname)
unionalldf = newdf.union(olddf)
return unionalldf
uniondf = superunionall(df2, df)
uniondf.show()
+----+---+-----+------+
|name|age|hight|weight|
+----+---+-----+------+
| Bob| 23|165.3| 60.2|
|Jack| 30|165.3| 82.4|
|Judy| 22|172.3| 63.4|
| Tom| 21|175.3| null|
|Mary| 30|165.3| null|
+----+---+-----+------+