LoginSignup
7
1

More than 5 years have passed since last update.

SparkのDataFrameを合体させる

Posted at

動機

dataframeをunionするとき、カラムのスキーマが一致していないとできない。あとからテーブルにカラムが追加されてしまうと、新しいテーブルと古いテーブルをunionできなくなってしまう。

解決策

古いテーブルには強制的にnullのカラムを追加する。

from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
data = [("Tom",21,175.3),("Mary",30,165.3)]
schema = T.StructType([
        T.StructField("name", T.StringType(), True),
        T.StructField("age", T.IntegerType(), True),
        T.StructField("hight", T.DoubleType(), True)
    ])
df = spark.createDataFrame(data,schema)
df.show()
+----+---+-----+
|name|age|hight|
+----+---+-----+
| Tom| 21|175.3|
|Mary| 30|165.3|
+----+---+-----+
data2 = [("Bob",23,165.3,60.2),("Jack",30,165.3,82.4),("Judy",22,172.3,63.4)]
schema2 = T.StructType([
        T.StructField("name", T.StringType(), True),
        T.StructField("age", T.IntegerType(), True),
        T.StructField("hight", T.DoubleType(), True),
        T.StructField("weight", T.DoubleType(), True)
    ])
df2 = spark.createDataFrame(data2,schema2)
df2.show()
+----+---+-----+------+
|name|age|hight|weight|
+----+---+-----+------+
| Bob| 23|165.3|  60.2|
|Jack| 30|165.3|  82.4|
|Judy| 22|172.3|  63.4|
+----+---+-----+------+
df2.union(df)#エラー!!
---------------------------------------------------------------------------

Py4JJavaError                             Traceback (most recent call last)

~/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:


~/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:


Py4JJavaError: An error occurred while calling o117.union.
: org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 4 columns and the second table has 3 columns;;
'Union
:- AnalysisBarrier
:     +- LogicalRDD [name#107, age#108, hight#109, weight#110], false
+- AnalysisBarrier
      +- LogicalRDD [name#101, age#102, hight#103], false

    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11.apply(CheckAnalysis.scala:245)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11.apply(CheckAnalysis.scala:242)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:242)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:104)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    at org.apache.spark.sql.Dataset.withSetOperator(Dataset.scala:3307)
    at org.apache.spark.sql.Dataset.union(Dataset.scala:1829)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)



During handling of the above exception, another exception occurred:


AnalysisException                         Traceback (most recent call last)

<ipython-input-50-eb00d5821249> in <module>()
----> 1 df2.union(df)#エラー!!


~/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/dataframe.py in union(self, other)
   1336         Also as standard in SQL, this function resolves columns by position (not by name).
   1337         """
-> 1338         return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)
   1339 
   1340     @since(1.3)


~/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:


~/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)


AnalysisException: "Union can only be performed on tables with the same number of columns, but the first table has 4 columns and the second table has 3 columns;;\n'Union\n:- AnalysisBarrier\n:     +- LogicalRDD [name#107, age#108, hight#109, weight#110], false\n+- AnalysisBarrier\n      +- LogicalRDD [name#101, age#102, hight#103], false\n"
def superunionall(newdf, olddf):
    """newdf と olddf をunionallする。

    olddfに無くてnewdfにあるカラムはolddfにnullのカラムを追加した後、unionされる。
    olddfにあってnewdfにないカラムはolddfからカラムを削除した後、unionされる。

    Args:
        newdf (DataFrame): 基準となるdf
        olddf  (DataFrame): newdfのカラムの形に変形されてからnewdfにunionallさせる。

    Returns:
        DataFrame: newdfとolddfをunionallしたDataFrame

    """
    diff = set(newdf.dtypes) - set(olddf.dtypes)
    for i in diff:
        olddf = olddf.withColumn(i[0], F.expr(" cast (null as {})".format(i[1]) ) )
    newdfcolname = [ i[0] for i in newdf.dtypes]
    olddf = olddf.select(*newdfcolname)
    unionalldf = newdf.union(olddf)
    return unionalldf
uniondf = superunionall(df2, df)
uniondf.show()
+----+---+-----+------+
|name|age|hight|weight|
+----+---+-----+------+
| Bob| 23|165.3|  60.2|
|Jack| 30|165.3|  82.4|
|Judy| 22|172.3|  63.4|
| Tom| 21|175.3|  null|
|Mary| 30|165.3|  null|
+----+---+-----+------+
7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1