1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Sparkのパラメータとパーティション数の関係を確認してみた

Last updated at Posted at 2023-05-14

背景・目的

以前、Sparkのファイルとパーティションの関係について確認してみた という記事で、読み込むファイルと、ファイルフォーマット、パラメータの設定により、Spark内部で取り扱うパーティション数がどのように変化するかを確認しました。

今回は、分割可能なファイルフォーマット(JSON)を使用して、下記のパラメータを変更してどのように、パーティション数が変化するかを検証してみます。

  • spark.default.parallelism
  • spark.sql.files.minPartitionNum

まとめ

  • spark.default.parallelism
    • EMRなどのデフォルトクラスタマネージャーのYARNでは、ExecutorのCore数または、2のどちらか大きい方が採用される。
    • ファイルには効果がない
  • spark.sql.files.minPartitionNum
    • 今回の検証では、変化が見られなかった。
      • 保証されていないと記載されていたので、何か他の要因があり得るかもしれません。
      • 引き続き、調査する予定ですが、分割する場合は、spark.sql.files.maxPartitionBytesで調整したほうが無難と思いました。

概要

Sparkのconfigurationから抜粋。

spark.default.parallelism

For distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs, it depends on the cluster manager:
Local mode: number of cores on the local machine
Mesos fine grained mode: 8
Others: total number of cores on all executor nodes or 2, whichever is larger

Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.

  • reduceByKey や join などの分散シャッフル操作の場合、親 RDD 内のパーティションの最大数。
  • 親 RDD を使用しない並列化などの操作の場合、クラスターマネージャーによって異なる
    • ローカル モード: ローカル マシン上のコアの数
    • Mesos fine grained mode: 8
    • Others: すべてのエグゼキュータノード上のコアの合計数または 2 のどちらか大きい方
  • ユーザーによって設定されていない場合に、join、reduceByKey、Parallelize などの変換によって返される RDD のデフォルトのパーティション数。

spark.sql.files.minPartitionNum

The suggested (not guaranteed) minimum number of split file partitions. If not set, the default value is spark.default.parallelism. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

  • 推奨される (保証されていない) 分割ファイル パーティションの最小数。
  • 設定されていない場合、デフォルト値はspark.default.Parallelismが採用される。
  • この構成は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効。

実践

上記の仕様を踏まえて、下記のシナリオを検証する。

  1. spark.default.parallelismのみ設定し、パーティション数に変化があるか
  2. Coreノード(SparkのExecutorのCore数)を変更し、spark.default.parallelismとパーティション数に変化があるか
  3. spark.sql.files.minPartitionNumを設定し、パーティション数に変化があるか

前提

下記のファイルを作成します。

$cat test.json 
{"id":"1","value":1}
{"id":"2","value":2}
{"id":"3","value":3}
{"id":"4","value":4}
{"id":"5","value":5}
{"id":"6","value":6}
{"id":"7","value":7}
{"id":"8","value":8}
{"id":"9","value":9}
{"id":"10","value":10}
{"id":"11","value":11}
{"id":"12","value":12}
{"id":"13","value":13}
{"id":"14","value":14}
{"id":"15","value":15}
{"id":"16","value":16}
{"id":"17","value":17}
{"id":"18","value":18}
{"id":"19","value":19}
{"id":"20","value":20}
$

1. spark.default.parallelism のみ設定し、パーティション数に変化があるか確認

ファイルから読み込む

ファイルから読み込んだときには、park.default.parallelismは、効果ありません。

ユーザーによって設定されていない場合に、join、reduceByKey、Parallelize などの変換によって返される RDD のデフォルトのパーティション数。

ドキュメントに書いてあるように、join、reduceByKey、Parallelizeのみ、有効なようです。

  1. spark.default.parallelismを4に設定し、下記のコードを実行します。

    from pyspark.sql import SparkSession
    from pyspark.sql import types as T, functions as F
    
    
    spark.conf.set("spark.default.parallelism", "4")
    print("spark.default.parallelism[{0}]".format(spark.conf.get('spark.default.parallelism')))
        
    spark = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    
    df1 = spark.read.json("s3://xxxxx/input/split/test.json")
    
    df1.show()
    print("Partitions [{0}]".format(df1.rdd.getNumPartitions()))
    df1.rdd.glom().collect()
    
    
  2. 実行します。
    image.png

  3. 結果は下記のとおりです。パーティションは1つです。(分割されません。)

    Partitions [1]
    [
        [
            Row(id='1', value=1),
            Row(id='2', value=2),
            Row(id='3', value=3),
            Row(id='4', value=4),
            Row(id='5', value=5),
            Row(id='6', value=6),
            Row(id='7', value=7),
            Row(id='8', value=8),
            Row(id='9', value=9),
            Row(id='10', value=10),
            Row(id='11', value=11),
            Row(id='12', value=12),
            Row(id='13', value=13),
            Row(id='14', value=14),
            Row(id='15', value=15),
            Row(id='16', value=16),
            Row(id='17', value=17),
            Row(id='18', value=18),
            Row(id='19', value=19),
            Row(id='20', value=20)
        ]
    ]
    
  4. 念の為、spark.default.parallelismを2に変えて試してみます。

    from pyspark.sql import SparkSession
    from pyspark.sql import types as T, functions as F
    
    
    spark.conf.set("spark.default.parallelism", "10")
    print("spark.default.parallelism[{0}]".format(spark.conf.get('spark.default.parallelism')))
        
    spark = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    
    df1 = spark.read.json("s3://xxxxx/input/split/test.json")
    
    df1.show()
    print("Partitions [{0}]".format(df1.rdd.getNumPartitions()))
    df1.rdd.glom().collect()
    
    
  5. 結果を確認します。やはり、パーティションは変更されません。(1のままです。)
    image.png

    Partitions [1]
    

SparkContextのparallelizeから作成する

  1. spark.default.parallelismを1に設定し、下記のコードを実行します。

    from pyspark.sql import SparkSession
    from pyspark.sql import types as T, functions as F
    
    spark.conf.set("spark.default.parallelism", "1")
    print("spark.default.parallelism[{0}]".format(spark.conf.get('spark.default.parallelism')))
    print("spark.executor.cores[{0}]".format(spark.conf.get('spark.executor.cores')))
    
    schema = T.StructType([
        T.StructField('id', T.StringType()),
        T.StructField('value', T.LongType())
    ])
    
    list = [(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9), (10,10), (11,11), (12,12), (13,13), (14,14), (15,15), (16,16), (17,17), (18,18), (19,19), (20,20)]
    
    
    rdd = sc.parallelize(list)
    
    
    print("Partitions [{0}]".format(rdd.getNumPartitions()))
    rdd.glom().collect()
    
  2. 実行します。
    image.png

  3. 結果は下記のとおりです。パーティションは2つです。(分割されました。)

    spark.default.parallelism[1]
    spark.executor.cores[4]
    Partitions [2]
    
    [
        [
            Row(id='1', value=1),
            Row(id='2', value=2),
            Row(id='3', value=3),
            Row(id='4', value=4),
            Row(id='5', value=5)
        ],
        [
            Row(id='6', value=6),
            Row(id='7', value=7),
            Row(id='8', value=8),
            Row(id='9', value=9),
            Row(id='10', value=10)
        ],
        [
            Row(id='11', value=11),
            Row(id='12', value=12),
            Row(id='13', value=13),
            Row(id='14', value=14),
            Row(id='15', value=15)
        ],
        [
            Row(id='16', value=16),
            Row(id='17', value=17),
            Row(id='18', value=18),
            Row(id='19', value=19),
            Row(id='20', value=20)
        ]
    ]
    

2. Coreノード(SparkのExecutorのCore数)を変更し、spark.default.parallelismとパーティション数に変化があるか確認

  • Others: すべてのエグゼキュータノード上のコアの合計数または 2 のどちらか大きい方

Excutor(のCore)を増やして、変化があるか確認しました。
ExecutorあたりのCore数が4つ。ノードが3つなので、12 パーティションとなりました。

  1. EMRのコアノードとタスクノードを1つから2つに増やします。
    image.png
    image.png

  2. リクエストされます。
    image.png
    image.png

  3. しばらくすると、追加されます。
    image.png
    image.png

  4. 下記のコードを実行します。

    
    from pyspark.sql import SparkSession
    from pyspark.sql import types as T, functions as F
    
    spark.conf.set("spark.default.parallelism", "1")
    print("spark.default.parallelism[{0}]".format(spark.conf.get('spark.default.parallelism')))
    print("spark.executor.cores[{0}]".format(spark.conf.get('spark.executor.cores')))
    
    schema = T.StructType([
        T.StructField('id', T.StringType()),
        T.StructField('value', T.LongType())
    ])
    
    list = [(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9), (10,10), (11,11), (12,12), (13,13), (14,14), (15,15), (16,16), (17,17), (18,18), (19,19), (20,20)]
    
    rdd = sc.parallelize(list)
    
    
    print("Partitions [{0}]".format(rdd.getNumPartitions()))
    rdd.glom().collect()
    
  5. 実行します。
    image.png

  6. 結果は下記のとおりです。パーティションは12個に分割されました。

    spark.default.parallelism[1]
    spark.executor.cores[4]
    Partitions [12]
    [
        [
            (1, 1)
        ], 
        [
            (2, 2),
             (3, 3)
        ], 
        [
            (4, 4),
             (5, 5)
        ],
        [
            (6, 6)
        ],
        [
            (7, 7),
             (8, 8)
        ],
        [
            (9, 9),
            (10, 10)
        ],
        [
            (11, 11)
        ],
        [
            (12, 12),
            (13, 13)
        ],
        [
            (14, 14),
            (15, 15)
        ],
        [
            (16, 16)
        ],
        [
            (17, 17),
            (18, 18)
        ],
        [
            (19, 19),
             (20, 20)
        ]
    ]
    
    

3.spark.sql.files.minPartitionNumを設定し、パーティション数に変化があるか確認

  1. 下記のコードを実行します。

    from pyspark.sql import SparkSession
    from pyspark.sql import types as T, functions as F
    
    
    spark.conf.set("spark.sql.files.minPartitionNum", "12")
    print("spark.sql.files.minPartitionNum[{0}]".format(spark.conf.get('spark.sql.files.minPartitionNum')))
        
    spark = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    
    df1 = spark.read.json("s3://xxxxx/input/split/test.json")
    rdd = df1.rdd.reduceByKey(lambda x, y: x + y)
    rdd.collect()
    
    print("Partitions [{0}]".format(rdd.getNumPartitions()))
    rdd.glom().collect()
    
    
  2. パーティションは1のままです。
    image.png

  3. 念の為、JSONではなくParquetファイルでも試してみます。

    from pyspark.sql import SparkSession
    from pyspark.sql import types as T, functions as F
    
    
    spark.conf.set("spark.sql.files.minPartitionNum", "12")
    print("spark.sql.files.minPartitionNum[{0}]".format(spark.conf.get('spark.sql.files.minPartitionNum')))
        
    spark = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    
    df1 = spark.read.parquet("s3://XXXXX/input/parquet/part-00000-6881f5d6-9cae-446e-bb26-52bbda45d37d-c000.snappy.parquet")
    
    df1.show()
    print("Partitions [{0}]".format(df1.rdd.getNumPartitions()))
    df1.rdd.glom().collect()
    
    
  4. 分割されませんでした。
    image.png

考察

今回、spark.sql.files.minPartitionNumを試してみましたが、期待していたとおりに分割されませんでした。

推奨される (保証されていない) 分割ファイル パーティションの最小数。

ドキュメントには、保証されないと書いてあるので、必ずしもこのパラメータだけで決まるのではなく、
環境や別の条件により変更されるのかもしれません。

引き続き、調査する予定ですが、分割する場合は、spark.sql.files.maxPartitionBytesで調整したほうが無難と思いました。

参考

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?