背景
異なるパーティション数のRDDをJoin した後、Join後のRDDのパーティション数が少ない方に合わせられてしまうという現象を確認。
後続の処理のパフォーマンスに影響を与えるので、Join後のRDDのパーティション数は多い方に合わせられる方が望ましい。
検証
取りあえず、spark-shell でその動作を検証してみました。
spark version:2.3.2
1.パーティション数1000のRDDを作る
scala> val rdd1 = sc.parallelize(1 to 10000, 1000)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val pairRdd1 = rdd1.map(r => (r, r))
pairRdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at <console>:25
scala> pairRdd1.partitions.size
res0: Int = 100
2.パーティション数10のRDDを作る
scala> val rdd2 = sc.parallelize(1 to 100, 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> val pairRdd2 = rdd2.map(r => (r, r))
pairRdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at map at <console>:25
scala> pairRdd2.partitions.size
res1: Int = 10
3.Join する
scala> val joined = pairRdd1.join(pairRdd2)
joined: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[6] at join at <console>:27
scala> joined.partitions.size
res2: Int = 1000
Join後のRDDは多い方のパーティション数(1000)の方に合わせられた結果になりました。
想定していた結果は少ない方のパーティション数(10)になることでしたが、基本的な仕様としては多い方に合わせられるようです。
scala> joined.toDebugString
res3: String =
(1000) MapPartitionsRDD[6] at join at <console>:27 []
| MapPartitionsRDD[5] at join at <console>:27 []
| CoGroupedRDD[4] at join at <console>:27 []
+-(1000) MapPartitionsRDD[1] at map at <console>:25 []
| | ParallelCollectionRDD[0] at parallelize at <console>:24 []
+-(10) MapPartitionsRDD[3] at map at <console>:25 []
| ParallelCollectionRDD[2] at parallelize at <console>:24 []
少ない方のパーティション数になるケースも存在するのか
詳細は省きますが、(暗黙的な)パーティション数の決定については、複数の要因が影響するようです。
基本的に spark のパラメータの一つである "spark.default.parallelism" が設定されていない場合、大きい方のパーティション数が採用されるように思われます。
もし、"spark.default.parallelism" が設定されている場合は、"spark.default.parallelism" に設定されている値とパーティション数を比較しますが、単純に大きい方が採用されるわけではなく、小さい方の値が採用されるケースもあるようです。
また、このパーティション数の決定ロジックについては、RDDとDataFrameでも異なるようです。
Join 時にパーティション数を明示する
暗黙的なパーティション数の決定には、理解しないといけないロジックがあり、意図しない結果が発生する可能性があるので、明示した方が安全そうです。
※別途、シャッフルの回数が増えることもないはず
scala> val joined2 = pairRdd1.join(pairRdd2, 200)
joined2: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[9] at join at <console>:27
scala> joined2.partitions.size
res5: Int = 200
scala> joined2.toDebugString
res6: String =
(200) MapPartitionsRDD[9] at join at <console>:27 []
| MapPartitionsRDD[8] at join at <console>:27 []
| CoGroupedRDD[7] at join at <console>:27 []
+-(1000) MapPartitionsRDD[1] at map at <console>:25 []
| | ParallelCollectionRDD[0] at parallelize at <console>:24 []
+-(10) MapPartitionsRDD[3] at map at <console>:25 []
| ParallelCollectionRDD[2] at parallelize at <console>:24 []
結論
暗黙的なパーティション数の決定は、Spark自身が最適なロジックを元に行っているはずですが、(個人的な経験として)それがアプリケーション的に最適にならないケースも存在すると考えていた方が良いかもしれません。
暗黙的にParition数が決定されるロジックを理解していない場合、明示した方が安全そうです。