0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache spark RDD を Join した後の Partition 数

Posted at
背景

異なるパーティション数のRDDをJoin した後、Join後のRDDのパーティション数が少ない方に合わせられてしまうという現象を確認。
後続の処理のパフォーマンスに影響を与えるので、Join後のRDDのパーティション数は多い方に合わせられる方が望ましい。

検証

取りあえず、spark-shell でその動作を検証してみました。
spark version:2.3.2

1.パーティション数1000のRDDを作る

size1000
scala> val rdd1 = sc.parallelize(1 to 10000, 1000)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val pairRdd1 = rdd1.map(r => (r, r))
pairRdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at <console>:25

scala> pairRdd1.partitions.size
res0: Int = 100

2.パーティション数10のRDDを作る

size10
scala> val rdd2 = sc.parallelize(1 to 100, 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val pairRdd2 = rdd2.map(r => (r, r))
pairRdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at map at <console>:25

scala> pairRdd2.partitions.size
res1: Int = 10

3.Join する

join
scala> val joined = pairRdd1.join(pairRdd2)
joined: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[6] at join at <console>:27

scala> joined.partitions.size
res2: Int = 1000

Join後のRDDは多い方のパーティション数(1000)の方に合わせられた結果になりました。
想定していた結果は少ない方のパーティション数(10)になることでしたが、基本的な仕様としては多い方に合わせられるようです。

scala> joined.toDebugString
res3: String =
(1000) MapPartitionsRDD[6] at join at <console>:27 []
  |    MapPartitionsRDD[5] at join at <console>:27 []
  |    CoGroupedRDD[4] at join at <console>:27 []
  +-(1000) MapPartitionsRDD[1] at map at <console>:25 []
  |   |    ParallelCollectionRDD[0] at parallelize at <console>:24 []
  +-(10) MapPartitionsRDD[3] at map at <console>:25 []
     |   ParallelCollectionRDD[2] at parallelize at <console>:24 []
少ない方のパーティション数になるケースも存在するのか

詳細は省きますが、(暗黙的な)パーティション数の決定については、複数の要因が影響するようです。
基本的に spark のパラメータの一つである "spark.default.parallelism" が設定されていない場合、大きい方のパーティション数が採用されるように思われます。
もし、"spark.default.parallelism" が設定されている場合は、"spark.default.parallelism" に設定されている値とパーティション数を比較しますが、単純に大きい方が採用されるわけではなく、小さい方の値が採用されるケースもあるようです。
また、このパーティション数の決定ロジックについては、RDDとDataFrameでも異なるようです。

Join 時にパーティション数を明示する

暗黙的なパーティション数の決定には、理解しないといけないロジックがあり、意図しない結果が発生する可能性があるので、明示した方が安全そうです。
※別途、シャッフルの回数が増えることもないはず

joinwithnumber
scala> val joined2 = pairRdd1.join(pairRdd2, 200)
joined2: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[9] at join at <console>:27

scala> joined2.partitions.size
res5: Int = 200

scala> joined2.toDebugString
res6: String =
(200) MapPartitionsRDD[9] at join at <console>:27 []
  |   MapPartitionsRDD[8] at join at <console>:27 []
  |   CoGroupedRDD[7] at join at <console>:27 []
  +-(1000) MapPartitionsRDD[1] at map at <console>:25 []
  |   |    ParallelCollectionRDD[0] at parallelize at <console>:24 []
  +-(10) MapPartitionsRDD[3] at map at <console>:25 []
     |   ParallelCollectionRDD[2] at parallelize at <console>:24 []
結論

暗黙的なパーティション数の決定は、Spark自身が最適なロジックを元に行っているはずですが、(個人的な経験として)それがアプリケーション的に最適にならないケースも存在すると考えていた方が良いかもしれません。
暗黙的にParition数が決定されるロジックを理解していない場合、明示した方が安全そうです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?