環境
- Spark 2.4.4
- Apache Zeppelin 0.9.0-preview1
- macOS 10.15 において、 Dockerに環境を構築
- Scala
経緯(読み飛ばし可能)
- Sparkの検証、学習をしていると、partition ID を確認したくなることがあります。
- 今まで、mapPartitionsWithIndex()を使って、調べていました。
- itrの型の指定、DF<->RDDの変換が大変だと感じていました。
val df1 = spark.range(0, 4, 1, 3)
df1.show
val df2 = df1.rdd.mapPartitionsWithIndex((i:Int, itr:Iterator[java.lang.Long]) => itr.map(l => (l, i))).toDF("id", "partition id")
df2.show
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
+---+
+---+------------+
| id|partition id|
+---+------------+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 2|
+---+------------+
違う記述方法を発見
- 上記の方法以外でも、spark_partition_id() を呼べば partition ID を取得できそうです。
- Zeppelin上では、明示的にimportしなくても使えるに見受けられます。
import org.apache.spark.sql.functions.spark_partition_id
val df1 = spark.range(0, 4, 1, 3)
df1.show
val df2 = df1.select(col("id"), spark_partition_id().as("partition id"))
df2.show
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
+---+
+---+------------+
| id|partition id|
+---+------------+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 2|
+---+------------+