1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Spark(on EMR)のパラメータを確認してみた

Last updated at Posted at 2023-05-28

背景・目的

こちらの、Amazon EMR で Spark アプリケーションを成功させるための設定を読み解いて、SparkやEMRのパラメータの理解を深めてみます。

まとめ

  • Sparkは、メモリを使用して並列分散処理を行うため、メモリへの依存が大きく問題が発生しやすい。代表的なエラーは下記のようなものがある。
    • メモリ不足エラー(Java Heap Space)
    • メモリ不足エラー(物理メモリの超過)
    • メモリ不足エラー(仮想メモリの超過)
    • メモリ不足エラー(Executorメモリの超過)
  • ベストプラクティスは、下記の通りです。
    • ワークロードにあわせて、適切なインスタンスタイプと数を設定する。
    • 適切なサイズが決まっていなければ、spark.dynamicAllocation.enabledをFalseにしてサイジングする。
      • spark.dynamicAllocation.initialExecutors/minExecutors/maxExecutorsを決める。
    • 最初は、ドライバーメモリ、エグゼキュータメモリ、および CPU パラメータを自分自身で制御する。

概要

以降の記事は、Amazon EMR で Spark アプリケーションを成功させるための設定を元に整理します。

Sparkの特徴と課題

Apache Spark は、オープンソースで高速な汎用目的のクラスターコンピューティングソフトウェアで、ビッグデータの分散処理で広く利用されています。Apache Spark は、タスクの I/O と実行時間を削減するためにノード全体のメモリで並行コンピューティングを実行することから、クラスターメモリ (RAM) に大きく依存しています。

Spark アプリケーションを成功させるには、データと処理の要件に基づいて Spark アプリケーションを適切に設定することが大切です。デフォルト設定では Spark が利用できるクラスターのリソースのすべてを使用しない場合があり、物理メモリまたは仮想メモリの問題、あるいはその両方が発生する可能性があります。Stackoverflow.com では、この特定のトピックに関連する何千もの質問が提起されています。

  • Sparkは、下記の特徴があるとのことです。
    • OSSのソフトウェア
    • ビッグデータの分散処理向き
    • ノード全体のメモリで並行コンピューティングを実行するアーキテクチャ
    • メモリに大きく依存する
    • メモリの問題が発生する

デフォルト設定または不適切な設定の Spark アプリケーションにおける一般的なメモリ問題

デフォルト設定または不適切な設定により、下記のようなメモリ不足エラーが起きます。

  • メモリ不足エラー(Java Heap Space)
    WARN TaskSetManager: Loss was due to 
    java.lang.OutOfMemoryError
    java.lang.OutOfMemoryError: Java heap space
    
  • メモリ不足エラー(物理メモリの超過)
    Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
    12.4 GB of 12.3 GB physical memory used.
    Consider boosting spark.yarn.executor.memoryOverhead.
    Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
    4.5GB of 3GB physical memory used limits.
    Consider boosting spark.yarn.executor.memoryOverhead.
    
  • メモリ不足エラー(仮想メモリの超過)
    Container killed by YARN for exceeding memory limits.
    1.1gb of 1.0gb virtual memory used.Killing container.
    
  • メモリ不足エラー(Executorメモリの超過)
    Required executor memory (1024+384 MB) is above 
    the max threshold (896 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb'
    and/or 'yarn.nodemanager.resource.memory-mb
    

上記のような、メモリ不足エラーの理由例を下記に記載します。

  1. Spark エグゼキュータインスタンスの数、エグゼキュータメモリの量、コアの数、または並列性が、大量のデータを処理するために適切に設定されていない場合。
  2. Spark エグゼキュータの物理メモリが YARN によって割り当てられたメモリを超過する場合。この場合、Spark エグゼキュータインスタンスのメモリとメモリオーバーヘッドの合計が、メモリ集約型の操作を処理するために十分な量ではありません。メモリ集約型の操作には、reduceByKey、groupBy などを使用したキャッシング、シャッフリング、および集約が含まれます。または、Spark エグゼキュータインスタンスのメモリとメモリオーバーヘッドの合計が yarn.scheduler.maximum-allocation-mb で定義されている量を超えている場合もあります。
  3. ガベージコレクションなどのシステム操作の実行に必要なメモリが Spark エグゼキュータインスタンスにない。
  • Sparkのリソースが不適切
  • Sparkの物理メモリ>YARNメモリ
  • 集約型操作に必要なリソース>Executorメモリ+メモリオーバーヘッド
  • GC等のシステム操作に必要なメモリ不足

Amazon EMR で Spark アプリケーションを成功させるための設定

1.アプリケーションのニーズに基づいてインスタンスのタイプと数を判断する

ワークロードにより適切なインスタンスタイプと数を設定する。

EMRクラスタには、3つのタイプがあり、ワークロードにより適切なインスタンスタイプと数を設定します。

  • プライマリー(マスタ)
    • リソースマネージャー
  • コア
    • 下記のようなプロセスやデーモン等が実行されます。
      • YARNのNodeManager
      • HadoopのMapReduce
      • SparkのExecutor
  • タスク
    • コアノードとの違いは、データの保存は行わない。

2.Spark 設定パラメータを決定する

spark.dynamicAllocation.enabledは、spark.dynamicAllocation.initialExecutors/minExecutors/maxExecutorsが数が適切に決定されている場合のみTrueとする。
それ以外は、ドライバーメモリ、エグゼキュータメモリ、および CPU パラメータを自分自身で制御する。

アプリケーション要件に基づいて、前述の追加プロパティを慎重に計算します。Spark アプリケーションをサブミットするとき (spark-submit)、または SparkConf オブジェクト内において、Spark-defaults でこれらのプロパティを適切に設定する。

image.png
※画像は、Amazon EMR で Apache Spark アプリケーションのメモリをうまく管理するためのベストプラクティス>2.Spark 設定パラメータを決定するから引用。

下記にSparkのパラメータの意味を解説します。

カテゴリ パラメータ名 説明
Driver spark.driver.memory ドライバーのために使用するメモリのサイズ。
spark.driver.cores ドライバーのために使用する仮想コアの数。
Executor spark.executor.memory タスクを実行する各エグゼキュータのために使用するメモリのサイズ。
spark.executor.cores 仮想コアの数。
spark.executor.instances ­ エグゼキュータの数。spark.dynamicAllocation.enabledtrue に設定されている場合以外は、このパラメータを設定します。
その他 spark.default.parallelism ユーザーによってパーティションの数が設定されていない場合に、joinreduceByKey、および parallelize などの変換によって返された RDD (Resilient Distributed Datasets) 内のパーティションのデフォルト数。

このリリースガイドでは、Amazon EMR がどのように Spark パラメータのデフォルト値を設定するかについてのおおまかな情報が提供されています。これらの値は spark-defaults 設定内で、クラスター内のコアインスタンスとタスクインスタンスのタイプに基づいて自動的に設定されます。

クラスター内で利用できるリソースのすべてを使用するには、maximizeResourceAllocation パラメータを true に設定してください。この EMR 固有のオプションは、コアインスタンスグループ内のインスタンスにあるエグゼキュータが利用できる最大のコンピューティングリソースとメモリリソースを計算してから、spark-defaults 設定でこれらのパラメータを設定します。この設定を使っても、ほとんどの場合デフォルトの数は少なく、アプリケーションはクラスターの力を完全に使用しません。例えば、並列性は規模が大きいクラスターのために高くできるにもかかわらず、spark.default.parallelism のデフォルトは利用可能な仮想コアの数の 2 倍に留まります。

YARN 上の Spark は、ワークロードに基づいて Spark アプリケーションに使用されるエグゼキュータの数を動的にスケールします。Amazon EMR のリリースバージョン 4.4.0 以降を使用すると、dynamic allocation がデフォルトで有効化されています (これは Spark ドキュメントで説明されています)。

spark.dynamicAllocation.enabled プロパティの問題は、これがサブプロパティの設定を必要とすることです。サブプロパティの例には、spark.dynamicAllocation.initialExecutors、minExecutors、および maxExecutors などがあります。サブプロパティはほとんどの場合、そして特に複数のアプリケーションを同時に実行する必要があるときに、アプリケーションのクラスターで正しい数のエグゼキュータを使用するために必要になります。サブプロパティの設定には、正確な数を把握するまでに数多くの試行錯誤が必要です。数が正確ではないと、キャパシティーが確保されていても、実際に使われることはありません。これはリソースの浪費、または他のアプリケーションに対するメモリエラーにつながります。

  • EMRには、maximizeResourceAllocationというパラメータがあります。
    • trueに設定することで下記の動作を行う。
      • コアインスタンスグループ内のインスタンスにあるExecutorが利用できる最大コンピューティングリソース、メモリを計算してから、spark-defaultsに、これらのパラメータを設定する。
    • しかし、デフォルトの数では少なく、アプリケーションはクラスタの完全に使用できない。
  • EMR4.4.0以降を使用した場合、dynamic allocationがデフォルトで有効化されている。
    • 有効な場合、Executorの数を動的にスケールする。
    • spark.dynamicAllocation.enabledで設定
      • spark.dynamicAllocationには、下記のサブプロパティがあります。
        • initialExecutors
        • minExecutors
        • maxExecutors
      • サブプロパティの設定は、適切に設定するまで試行錯誤が必要。 適切に設定されないと、リソースの浪費や、他のアプリケーションに対するメモリエラーにつながる。

設定例

前提

Amazon S3 に保存されている何千ものファイルに分散された 200 テラバイトのデータを処理するとしましょう。さらに、1 個の r5.12xlarge マスターノードと 19 個の r5.12xlarge コアノードを持つ Amazon EMR クラスターを使ってこれを行うと仮定します。各 r5.12xlarge インスタンスには 48 個の仮想コア (vCPU) と 384 GB の RAM があります。これらすべての計算は、AWS が本番使用向けに推奨する --deploy-mode クラスターのためのものです。

  • 200TBのデータを処理。数千のファイルに分散されている
  • プライマリーノード(マスターノード)
    • r5.12xlarge * 1ノード
  • コアノード
    • r5.12xlarge * 19ノード
  • デプロイモードは、Clusterモード
  • r5.12xlargeは、下記のスペックです。
    • 48 vCPU
    • 384 GB メモリ

パラメータの設定例

Sparkアプリケーション実行に必要な、基本的なパラメータと、設定例を記載します。

# パラメータ(計算項目) 設定値 説明
1 spark.executor.cores - 5個の仮想コア固定
2 (Number of executors per instance) (total number of virtual cores per instance - 1)/ spark.executors.cores

■ 設定例
(48vCPU - 1vCPU)/ 5 core = 47 / 5 = 9 (rounded down)
1 インスタンスあたりのエグゼキュータの数

Hadoop デーモン用に確保しておくため、仮想コアの合計数から仮想コアを 1 個差し引きます。
3 (Total executor memory) total RAM per instance / number of executors per instance

■ 設定例
(384 GB -1 GBメモリ) / 9 Executors = 42 (rounded down)
1インスタンスあたりの合計 RAMと 1 インスタンスあたりのエグゼキュータの数を使って、エグゼキュータメモリの合計容量を求めます。
Hadoop デーモン用に 1 GB を残しておいてください。
4 spark.yarn.executor.memoryOverhead total executor memory (上記の#3の結果) * 0.10

■ 設定例
42 * 0.1 = 5 (rounded down)
エグゼキュータメモリの合計容量の 10 パーセントをメモリオーバーヘッドに割り当てる。
5 spark.executor.memory total executor memory (上記の#3の結果) * 0.90

■ 設定例
42 * 0.9 = 37 (rounded down)
90 パーセントをエグゼキュータメモリに割り当てる。
6 spark.driver.cores spark.driver.cores= spark.executors.cores. spark.executors.cores と等しくなるように設定することをお勧めします。
7 spark.driver.memory spark.driver.memory = spark.executors.memory spark.executors.memory と等しくなるように設定することをお勧めします。
8 spark.executor.instances (number of executors per instance (上記の#2の結果) * number of core instances) minus 1 for the driver

■ 設定例
(9 * 19) - 1 = 170
エグゼキュータの数とインスタンスの合計数を乗じて計算します。ドライバー用にエグゼキュータを 1 個残しておいてください。
9 spark.default.parallelism spark.executor.instances (上記の#8の結果) * spark.executors.cores (上記の#1の結果) * 2

■ 設定例
170 * 5 * 2 = 1,700
この計算の結果は 1,700 個のパーティションになるが、各パーティションのサイズを見積り、coalesce または repartition を使ってこの数値を適切に調整を推奨とのこと。

データフレームの場合は、spark.default.parallelism と共に spark.sql.shuffle.partitions パラメータを設定します。

その他のパラメータについても記載します。

パラメータ 説明
spark.network.timeout すべてのネットワークトランザクションのタイムアウト。
spark.executor.heartbeatInterval ドライバーに対する各エグゼキュータのハートビートの間隔

※ この値は、spark.network.timeout よりも大幅に少ない値である必要があるとのこと。
spark.memory.fraction Spark の実行とストレージに使用される JVM ヒープ領域の割合。

この値が低いほど、スピルとキャッシュデータの削除がより頻繁に発生します。
spark.memory.storageFraction spark.memory.fraction によって確保されたリージョンのサイズの一部として表されます。

この値が高いほど、実行用に利用できる作業メモリが少なくなる可能性があります。これは、タスクがディスクにより頻繁にスピルする可能性があることを意味します。
spark.yarn.scheduler.reporterThread.maxFailures YARN がアプリケーションを失敗させるまでに許容されるエグゼキュータの最大失敗回数。
spark.rdd.compress true に設定すると、このプロパティは RDD を圧縮することによって、追加の CPU 時間を代償にかなりの領域を節約することができます。
spark.shuffle.compress true に設定すると、このプロパティはマップ出力を圧縮して領域を節約します。
spark.shuffle.spill.compress true に設定すると、このプロパティはシャッフル中にスピルされたデータを圧縮します。
spark.sql.shuffle.partitions 結合と集約のためのパーティションの数を設定します。
spark.serializer データをシリアライズまたはデシリアライズするシリアライザーを設定します。

シリアライザーとしては、Kyro (org.apache.spark.serializer.KryoSerializer) がよい。Java のデフォルトシリアライザーよりも高速でコンパクト。

3.適切なガベージコレクターを実装してメモリを効率的にクリアする

ガベージコレクションは、特定のケースでメモリ不足エラーを引き起こす場合があります。これには、アプリケーション内に複数の大規模な RDD が存在するケースが含まれます。他のケースは、タスク実行メモリと RDD キャッシュメモリの間に干渉がある場合に発生します。

  • 下記のような場合に、GCによりメモリ不足を引き起こす場合がある。
    • アプリ内に大規模なRDDが存在する場合
    • タスク実行メモリと、RDDキャッシュメモリの間に干渉がある場合

古いオブジェクトをエビクトして、メモリ内に新しいオブジェクトを設置するために、複数のガベージコレクターを使用することができます。ただし、最新の Garbage First Garbage Collector (G1GC) は、古いガベージコレクターのレイテンシーとスループットの制限を克服します。

```
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
```
  • G1GCを使用するのがよい。

4.YARN 設定パラメータを設定する

すべての Spark 設定プロパティが正しく計算され、設定されているとしても、OS によって仮想メモリが積極的に増やされるときに、まれではありますが、仮想メモリ不足エラーが生じる可能性があります。これらのアプリケーションの失敗を防ぐには、YARN サイト設定に以下のフラグを設定します。

```
"yarn.nodemanager.vmem-check-enabled":"false",
"yarn.nodemanager.pmem-check-enabled":"false"
```
  • OSにより仮想メモリ不足エラーが引き起こされる可能性がある。

5.デバッグとモニタリングを実行する

Spark 設定オプションがどこからきているかの詳細を得るには、–verbose オプションを付けて spark-submit を実行することができます。また、Ganglia および Spark UI を使ってアプリケーションの進捗状況、クラスターの RAM 使用率、ネットワーク I/O などを監視することができます。

  • Ganglia、Spark UIを使って監視・確認する

実践

下記のようなシナリオで、挙動を確認します。

  • EMRを起動した状態を確認
  • OOM

EMRを起動した状態を確認

emr-6.10.0を起動した状態の設定を確認します。

デフォルトでは、推奨されるパラメータの設定はされていません。

  1. クラスタのインスタンスグループの内容は下記のとおりです。

    • プライマリグループ
      • m6a.xlarge
        • 4 vCore、16 GiB メモリ, EBS
    • コアグループ
      • m6a.xlarge
        • 4 vCore、16 GiB メモリ, EBS
  2. maximizeResourceAllocationを確認します。falseでした。

    $ aws emr describe-cluster --cluster-id j-XXXXXXXX --region us-west-2 | grep -10 maximizeResourceAllocation
                }
            ],
            "Tags": [],
            "ServiceRole": "arn:aws:iam::xxxxxx:role/EMR_DefaultRole",
            "NormalizedInstanceHours": 16,
            "MasterPublicDnsName": "xxxxxxxxxx.us-west-2.compute.internal",
            "Configurations": [
                {
                    "Classification": "spark",
                    "Properties": {
                        "maximizeResourceAllocation": "false"
                    }
                },
                {
                    "Classification": "spark-hive-site",
                    "Properties": {
                        "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
                    }
                }
            ],
            "ScaleDownBehavior": "TERMINATE_AT_TASK_COMPLETION",
    
  3. EMR Studioで設定を確認します。

    from pyspark.sql import SparkSession
    
    session = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    sc = session.sparkContext
    conf = sc.getConf()
    
    print("spark.dynamicAllocation.enabled:[{0}]".format(conf.get("spark.dynamicAllocation.enabled")))
    print("spark.dynamicAllocation.initialExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.initialExecutors")))
    print("spark.dynamicAllocation.minExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.minExecutors")))
    print("spark.dynamicAllocation.maxExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.maxExecutors")))
    
    print("spark.executor.cores:[{0}]".format(conf.get("spark.executor.cores")))
    print("spark.yarn.executor.memoryOverhead:[{0}]".format(conf.get("spark.yarn.executor.memoryOverhead")))
    print("spark.executor.memory:[{0}]".format(conf.get("spark.executor.memory")))
    
    print("spark.driver.cores:[{0}]".format(conf.get("spark.driver.cores")))
    print("spark.driver.memory:[{0}]".format(conf.get("spark.driver.memory")))
    print("spark.executor.instances:[{0}]".format(conf.get("spark.executor.instances")))
    
    print("spark.default.parallelism:[{0}]".format(conf.get("spark.default.parallelism")))
    
    
    print("spark.network.timeout:[{0}]".format(conf.get("spark.network.timeout")))
    print("spark.executor.heartbeatInterval:[{0}]".format(conf.get("spark.executor.heartbeatInterval")))
    
    print("spark.memory.fraction:[{0}]".format(conf.get("spark.memory.fraction")))
    print("spark.memory.storageFraction:[{0}]".format(conf.get("spark.memory.storageFraction")))
    print("spark.yarn.scheduler.reporterThread.maxFailures:[{0}]".format(conf.get("spark.yarn.scheduler.reporterThread.maxFailures")))
    print("spark.rdd.compress:[{0}]".format(conf.get("spark.rdd.compress")))
    print("spark.shuffle.compress:[{0}]".format(conf.get("spark.shuffle.compress")))
    print("spark.shuffle.spill.compress:[{0}]".format(conf.get("spark.shuffle.spill.compress")))
    print("spark.sql.shuffle.partitions:[{0}]".format(conf.get("spark.sql.shuffle.partitions")))
    print("spark.serializer:[{0}]".format(conf.get("spark.serializer")))
    
    ===
    
    spark.dynamicAllocation.enabled:[true]
    spark.dynamicAllocation.initialExecutors:[None]
    spark.dynamicAllocation.minExecutors:[None]
    spark.dynamicAllocation.maxExecutors:[None]
    spark.executor.cores:[2]
    spark.yarn.executor.memoryOverhead:[None]
    spark.executor.memory:[4921M]
    spark.driver.cores:[None]
    spark.driver.memory:[2048M]
    spark.executor.instances:[None]
    spark.default.parallelism:[None]
    spark.network.timeout:[None]
    spark.executor.heartbeatInterval:[None]
    spark.memory.fraction:[None]
    spark.memory.storageFraction:[None]
    spark.yarn.scheduler.reporterThread.maxFailures:[None]
    spark.rdd.compress:[None]
    spark.shuffle.compress:[None]
    spark.shuffle.spill.compress:[None]
    spark.sql.shuffle.partitions:[None]
    spark.serializer:[None]
    
    
    パラメータ 設定値 備考
    spark.dynamicAllocation.enabled true
    spark.dynamicAllocation.initialExecutors None
    spark.dynamicAllocation.minExecutors None
    spark.dynamicAllocation.maxExecutors None
    spark.executor.cores 2 "5"という想定だが、想定より少ない。
    spark.yarn.executor.memoryOverhead None
    spark.executor.memory 4921M
    spark.driver.cores None
    spark.driver.memory 2048M spark.executors.memoryという同一という想定だが異なる。
    spark.executor.instances None
    spark.default.parallelism None
    spark.network.timeout None
    spark.executor.heartbeatInterval None
    spark.memory.fraction None
    spark.memory.storageFraction None
    spark.yarn.scheduler.reporterThread.maxFailures None
    spark.rdd.compress None
    spark.shuffle.compress None
    spark.shuffle.spill.compress None
    spark.sql.shuffle.partitions None
    spark.serializer None
  4. YARNのTimeline ServerのUIから yarn.scheduler.maximum-allocation-mbを確認します。8129MBでした。

    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>8192</value>
        <final>false</final>
        <source>yarn-default.xml</source>
    </property>
    

Executorメモリ < ファイルサイズ

ファイルサイズよりもExecutorメモリを小さくしてみます。

  1. ファイルサイズが11.6GBです。
    image.png

  2. Executor(spark.executor.memory)のメモリを 1GiB(1024M)に設定してみます。ちなみにベストプラクティスに沿った場合、下記の設定になります。

  • m6a.xlargeインスタンス
    • 4 vCore
    • 16 GiB メモリ
  • Number of executors per instance = 4 vCore -1 vCore / 2 = 1
  • Total executor memory = 16GiB - 1GiB / 1 = 15GiB
  • spark.executor.memory = 15GiB * 0.9 = 13.5 ≒ 13 GiB
  • spark.yarn.executor.memoryOverhead = 15 GiB * 0.1 = 1.5 GiB ≒ 1 GiB
  1. 下記のパラメータを設定してEMRクラスタを作成します。

    [
      {
        "Classification": "spark",
        "Properties": {
          "maximizeResourceAllocation": "false"
        }
      },
      {
        "Classification": "spark-defaults",
        "Properties": {
          "spark.executor.memory": "1024M"
        }
      }
    ]
    
  2. 設定値を確認します。設定した通り、spark.executor.memoryは1024M(1GiB)になりました。

    from pyspark.sql import SparkSession
    
    session = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    sc = session.sparkContext
    conf = sc.getConf()
    
    print("spark.dynamicAllocation.enabled:[{0}]".format(conf.get("spark.dynamicAllocation.enabled")))
    print("spark.dynamicAllocation.initialExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.initialExecutors")))
    print("spark.dynamicAllocation.minExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.minExecutors")))
    print("spark.dynamicAllocation.maxExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.maxExecutors")))
    
    
    print("spark.executor.cores:[{0}]".format(conf.get("spark.executor.cores")))
    print("spark.yarn.executor.memoryOverhead:[{0}]".format(conf.get("spark.yarn.executor.memoryOverhead")))
    print("spark.executor.memory:[{0}]".format(conf.get("spark.executor.memory")))
    
    
    print("spark.driver.cores:[{0}]".format(conf.get("spark.driver.cores")))
    print("spark.driver.memory:[{0}]".format(conf.get("spark.driver.memory")))
    print("spark.executor.instances:[{0}]".format(conf.get("spark.executor.instances")))
    
    print("spark.default.parallelism:[{0}]".format(conf.get("spark.default.parallelism")))
    
    
    print("spark.network.timeout:[{0}]".format(conf.get("spark.network.timeout")))
    print("spark.executor.heartbeatInterval:[{0}]".format(conf.get("spark.executor.heartbeatInterval")))
    
    print("spark.memory.fraction:[{0}]".format(conf.get("spark.memory.fraction")))
    print("spark.memory.storageFraction:[{0}]".format(conf.get("spark.memory.storageFraction")))
    print("spark.yarn.scheduler.reporterThread.maxFailures:[{0}]".format(conf.get("spark.yarn.scheduler.reporterThread.maxFailures")))
    print("spark.rdd.compress:[{0}]".format(conf.get("spark.rdd.compress")))
    print("spark.shuffle.compress:[{0}]".format(conf.get("spark.shuffle.compress")))
    print("spark.shuffle.spill.compress:[{0}]".format(conf.get("spark.shuffle.spill.compress")))
    print("spark.sql.shuffle.partitions:[{0}]".format(conf.get("spark.sql.shuffle.partitions")))
    print("spark.serializer:[{0}]".format(conf.get("spark.serializer")))
    
    
    print("yarn.scheduler.maximum-allocation-mb:[{0}]".format(conf.get("yarn.scheduler.maximum-allocation-mb")))
    
    ===
    
    spark.dynamicAllocation.enabled:[true]
    spark.dynamicAllocation.initialExecutors:[None]
    spark.dynamicAllocation.minExecutors:[None]
    spark.dynamicAllocation.maxExecutors:[None]
    spark.executor.cores:[2]
    spark.yarn.executor.memoryOverhead:[None]
    spark.executor.memory:[1024M]
    spark.driver.cores:[None]
    spark.driver.memory:[2048M]
    spark.executor.instances:[None]
    spark.default.parallelism:[None]
    spark.network.timeout:[None]
    spark.executor.heartbeatInterval:[None]
    spark.memory.fraction:[None]
    spark.memory.storageFraction:[None]
    spark.yarn.scheduler.reporterThread.maxFailures:[None]
    spark.rdd.compress:[None]
    spark.shuffle.compress:[None]
    spark.shuffle.spill.compress:[None]
    spark.sql.shuffle.partitions:[None]
    spark.serializer:[None]
    yarn.scheduler.maximum-allocation-mb:[None]
    
    
  3. パーティション分割せずに、1つのタスクで読み込みます。

    spark.conf.set('spark.sql.files.maxPartitionBytes','12884901888')
    df3 = spark.read.json("s3://XXXXX/input/json/largedata/one-file/20230528-3/")
    
    df4 = df3.groupBy("group").count()
    
    df4.printSchema()
    df4.count()
    
  4. 成功してしまいました。想定ではエラーになると思っていました。

    root
     |-- group: string (nullable = true)
     |-- count: long (nullable = false)
    
    10
    

考察

今回は、主に、Sparkのメモリについての設定を学びました。
実践では、メモリの設定値を変更してエラーになるケースを試してみましたが、想定通りうまくいきませんでした。
今後、引き続きチューニングを行い、効果を検証してみようと思います。

参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?