背景・目的
以前、Sparkのファイルとパーティションの関係について確認してみた という記事で、読み込むファイルと、ファイルフォーマット、パラメータの設定により、Spark内部で取り扱うパーティション数がどのように変化するかを確認しました。
今回は、分割可能なファイルフォーマット(JSON)を使用して、下記のパラメータを変更してどのように、パーティション数が変化するかを検証してみます。
- spark.default.parallelism
- spark.sql.files.minPartitionNum
まとめ
- spark.default.parallelism
- EMRなどのデフォルトクラスタマネージャーのYARNでは、ExecutorのCore数または、2のどちらか大きい方が採用される。
- ファイルには効果がない
- spark.sql.files.minPartitionNum
-
今回の検証では、変化が見られなかった。
- 保証されていないと記載されていたので、何か他の要因があり得るかもしれません。
- 引き続き、調査する予定ですが、分割する場合は、
spark.sql.files.maxPartitionBytes
で調整したほうが無難と思いました。
-
今回の検証では、変化が見られなかった。
概要
Sparkのconfigurationから抜粋。
spark.default.parallelism
For distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs, it depends on the cluster manager:
Local mode: number of cores on the local machine
Mesos fine grained mode: 8
Others: total number of cores on all executor nodes or 2, whichever is larger
Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.
- reduceByKey や join などの分散シャッフル操作の場合、親 RDD 内のパーティションの最大数。
- 親 RDD を使用しない並列化などの操作の場合、クラスターマネージャーによって異なる
- ローカル モード: ローカル マシン上のコアの数
- Mesos fine grained mode: 8
- Others: すべてのエグゼキュータノード上のコアの合計数または 2 のどちらか大きい方
- ユーザーによって設定されていない場合に、join、reduceByKey、Parallelize などの変換によって返される RDD のデフォルトのパーティション数。
spark.sql.files.minPartitionNum
The suggested (not guaranteed) minimum number of split file partitions. If not set, the default value is spark.default.parallelism. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
- 推奨される (保証されていない) 分割ファイル パーティションの最小数。
- 設定されていない場合、デフォルト値はspark.default.Parallelismが採用される。
- この構成は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効。
実践
上記の仕様を踏まえて、下記のシナリオを検証する。
- spark.default.parallelismのみ設定し、パーティション数に変化があるか
- Coreノード(SparkのExecutorのCore数)を変更し、spark.default.parallelismとパーティション数に変化があるか
- spark.sql.files.minPartitionNumを設定し、パーティション数に変化があるか
前提
下記のファイルを作成します。
$cat test.json
{"id":"1","value":1}
{"id":"2","value":2}
{"id":"3","value":3}
{"id":"4","value":4}
{"id":"5","value":5}
{"id":"6","value":6}
{"id":"7","value":7}
{"id":"8","value":8}
{"id":"9","value":9}
{"id":"10","value":10}
{"id":"11","value":11}
{"id":"12","value":12}
{"id":"13","value":13}
{"id":"14","value":14}
{"id":"15","value":15}
{"id":"16","value":16}
{"id":"17","value":17}
{"id":"18","value":18}
{"id":"19","value":19}
{"id":"20","value":20}
$
1. spark.default.parallelism のみ設定し、パーティション数に変化があるか確認
ファイルから読み込む
ファイルから読み込んだときには、park.default.parallelismは、効果ありません。
ユーザーによって設定されていない場合に、join、reduceByKey、Parallelize などの変換によって返される RDD のデフォルトのパーティション数。
ドキュメントに書いてあるように、join、reduceByKey、Parallelizeのみ、有効なようです。
-
spark.default.parallelismを4に設定し、下記のコードを実行します。
from pyspark.sql import SparkSession from pyspark.sql import types as T, functions as F spark.conf.set("spark.default.parallelism", "4") print("spark.default.parallelism[{0}]".format(spark.conf.get('spark.default.parallelism'))) spark = SparkSession \ .builder \ .appName("Python example") \ .getOrCreate() df1 = spark.read.json("s3://xxxxx/input/split/test.json") df1.show() print("Partitions [{0}]".format(df1.rdd.getNumPartitions())) df1.rdd.glom().collect()
-
結果は下記のとおりです。パーティションは1つです。(分割されません。)
Partitions [1] [ [ Row(id='1', value=1), Row(id='2', value=2), Row(id='3', value=3), Row(id='4', value=4), Row(id='5', value=5), Row(id='6', value=6), Row(id='7', value=7), Row(id='8', value=8), Row(id='9', value=9), Row(id='10', value=10), Row(id='11', value=11), Row(id='12', value=12), Row(id='13', value=13), Row(id='14', value=14), Row(id='15', value=15), Row(id='16', value=16), Row(id='17', value=17), Row(id='18', value=18), Row(id='19', value=19), Row(id='20', value=20) ] ]
-
念の為、
spark.default.parallelism
を2に変えて試してみます。from pyspark.sql import SparkSession from pyspark.sql import types as T, functions as F spark.conf.set("spark.default.parallelism", "10") print("spark.default.parallelism[{0}]".format(spark.conf.get('spark.default.parallelism'))) spark = SparkSession \ .builder \ .appName("Python example") \ .getOrCreate() df1 = spark.read.json("s3://xxxxx/input/split/test.json") df1.show() print("Partitions [{0}]".format(df1.rdd.getNumPartitions())) df1.rdd.glom().collect()
-
結果を確認します。やはり、パーティションは変更されません。(1のままです。)
Partitions [1]
SparkContextのparallelizeから作成する
-
spark.default.parallelismを1に設定し、下記のコードを実行します。
from pyspark.sql import SparkSession from pyspark.sql import types as T, functions as F spark.conf.set("spark.default.parallelism", "1") print("spark.default.parallelism[{0}]".format(spark.conf.get('spark.default.parallelism'))) print("spark.executor.cores[{0}]".format(spark.conf.get('spark.executor.cores'))) schema = T.StructType([ T.StructField('id', T.StringType()), T.StructField('value', T.LongType()) ]) list = [(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9), (10,10), (11,11), (12,12), (13,13), (14,14), (15,15), (16,16), (17,17), (18,18), (19,19), (20,20)] rdd = sc.parallelize(list) print("Partitions [{0}]".format(rdd.getNumPartitions())) rdd.glom().collect()
-
結果は下記のとおりです。パーティションは2つです。(分割されました。)
spark.default.parallelism[1] spark.executor.cores[4] Partitions [2] [ [ Row(id='1', value=1), Row(id='2', value=2), Row(id='3', value=3), Row(id='4', value=4), Row(id='5', value=5) ], [ Row(id='6', value=6), Row(id='7', value=7), Row(id='8', value=8), Row(id='9', value=9), Row(id='10', value=10) ], [ Row(id='11', value=11), Row(id='12', value=12), Row(id='13', value=13), Row(id='14', value=14), Row(id='15', value=15) ], [ Row(id='16', value=16), Row(id='17', value=17), Row(id='18', value=18), Row(id='19', value=19), Row(id='20', value=20) ] ]
2. Coreノード(SparkのExecutorのCore数)を変更し、spark.default.parallelismとパーティション数に変化があるか確認
- Others: すべてのエグゼキュータノード上のコアの合計数または 2 のどちらか大きい方
Excutor(のCore)を増やして、変化があるか確認しました。
ExecutorあたりのCore数が4つ。ノードが3つなので、12 パーティションとなりました。
-
下記のコードを実行します。
from pyspark.sql import SparkSession from pyspark.sql import types as T, functions as F spark.conf.set("spark.default.parallelism", "1") print("spark.default.parallelism[{0}]".format(spark.conf.get('spark.default.parallelism'))) print("spark.executor.cores[{0}]".format(spark.conf.get('spark.executor.cores'))) schema = T.StructType([ T.StructField('id', T.StringType()), T.StructField('value', T.LongType()) ]) list = [(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9), (10,10), (11,11), (12,12), (13,13), (14,14), (15,15), (16,16), (17,17), (18,18), (19,19), (20,20)] rdd = sc.parallelize(list) print("Partitions [{0}]".format(rdd.getNumPartitions())) rdd.glom().collect()
-
結果は下記のとおりです。パーティションは12個に分割されました。
spark.default.parallelism[1] spark.executor.cores[4] Partitions [12] [ [ (1, 1) ], [ (2, 2), (3, 3) ], [ (4, 4), (5, 5) ], [ (6, 6) ], [ (7, 7), (8, 8) ], [ (9, 9), (10, 10) ], [ (11, 11) ], [ (12, 12), (13, 13) ], [ (14, 14), (15, 15) ], [ (16, 16) ], [ (17, 17), (18, 18) ], [ (19, 19), (20, 20) ] ]
3.spark.sql.files.minPartitionNumを設定し、パーティション数に変化があるか確認
-
下記のコードを実行します。
from pyspark.sql import SparkSession from pyspark.sql import types as T, functions as F spark.conf.set("spark.sql.files.minPartitionNum", "12") print("spark.sql.files.minPartitionNum[{0}]".format(spark.conf.get('spark.sql.files.minPartitionNum'))) spark = SparkSession \ .builder \ .appName("Python example") \ .getOrCreate() df1 = spark.read.json("s3://xxxxx/input/split/test.json") rdd = df1.rdd.reduceByKey(lambda x, y: x + y) rdd.collect() print("Partitions [{0}]".format(rdd.getNumPartitions())) rdd.glom().collect()
-
念の為、JSONではなくParquetファイルでも試してみます。
from pyspark.sql import SparkSession from pyspark.sql import types as T, functions as F spark.conf.set("spark.sql.files.minPartitionNum", "12") print("spark.sql.files.minPartitionNum[{0}]".format(spark.conf.get('spark.sql.files.minPartitionNum'))) spark = SparkSession \ .builder \ .appName("Python example") \ .getOrCreate() df1 = spark.read.parquet("s3://XXXXX/input/parquet/part-00000-6881f5d6-9cae-446e-bb26-52bbda45d37d-c000.snappy.parquet") df1.show() print("Partitions [{0}]".format(df1.rdd.getNumPartitions())) df1.rdd.glom().collect()
考察
今回、spark.sql.files.minPartitionNum
を試してみましたが、期待していたとおりに分割されませんでした。
推奨される (保証されていない) 分割ファイル パーティションの最小数。
ドキュメントには、保証されないと書いてあるので、必ずしもこのパラメータだけで決まるのではなく、
環境や別の条件により変更されるのかもしれません。
引き続き、調査する予定ですが、分割する場合は、spark.sql.files.maxPartitionBytes
で調整したほうが無難と思いました。
参考