同じDataFrameを2つjoinしようとしてエラーになった時のメモです。
環境:
$ pyspark --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.18
Branch HEAD
Compiled by user liangchi on 2023-02-10T19:57:40Z
Revision 5103e00c4ce5fcc4264ca9c4df12295d42557af6
Url https://github.com/apache/spark
Type --help for more information.
問題例
こんなDataFrameがあったとします。社員ID, 名前, 上司のIDを持っています。
>>> employee = spark.createDataFrame([[1, 'Foo', 3], [2, 'Bar', 3], [3, 'Baz', None]], ['emp_id', 'name', 'manager_id'])
>>> employee.show()
+------+----+----------+
|emp_id|name|manager_id|
+------+----+----------+
| 1| Foo| 3|
| 2| Bar| 3|
| 3| Baz| null|
+------+----+----------+
で、社員の名前とその上司の名前を取得したい、というときに、このDataFrame自身をjoinしてこんな風にしたい、というのが問題です。
+------+----+----------+------------+
|emp_id|name|manager_id|manager_name|
+------+----+----------+------------+
| 1| Foo| 3| Baz|
| 2| Bar| 3| Baz|
| 3| Baz| null| null|
+------+----+----------+------------+
ダメな例
単純にやると以下のエラーになります。
>>> d = employee.join(employee, employee.manager_id == employee.emp_id, 'left')
pyspark.sql.utils.AnalysisException: Column manager_id#47L, emp_id#45L are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.
DataFrame.alias
で別名をつけることができますが、それでもエラーになります。
>>> manager = employee.alias('manager')
>>> d = employee.join(manager, employee.manager_id == manager.emp_id, 'left')
pyspark.sql.utils.AnalysisException: Column manager_id#221L are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.
解決
DataFrame.withColumnRenamed
で全てのcolumn名を変更すればjoinできるようになります。
>>> manager = employee.withColumnRenamed('emp_id', 'manager_emp_id').withColumnRenamed('name', 'manager_name').withColumnRenamed('manager_id', 'manager_manager_id')
>>> d = employee.join(manager, employee.manager_id == manager.manager_emp_id, 'left')
>>> d.select('emp_id', 'name', 'manager_id', 'manager_name').show()
+------+----+----------+------------+
|emp_id|name|manager_id|manager_name|
+------+----+----------+------------+
| 1| Foo| 3| Baz|
| 2| Bar| 3| Baz|
| 3| Baz| null| null|
+------+----+----------+------------+