背景・目的
こちらの、Amazon EMR で Spark アプリケーションを成功させるための設定を読み解いて、SparkやEMRのパラメータの理解を深めてみます。
まとめ
- Sparkは、メモリを使用して並列分散処理を行うため、メモリへの依存が大きく問題が発生しやすい。代表的なエラーは下記のようなものがある。
- メモリ不足エラー(Java Heap Space)
- メモリ不足エラー(物理メモリの超過)
- メモリ不足エラー(仮想メモリの超過)
- メモリ不足エラー(Executorメモリの超過)
- ベストプラクティスは、下記の通りです。
- ワークロードにあわせて、適切なインスタンスタイプと数を設定する。
- 適切なサイズが決まっていなければ、spark.dynamicAllocation.enabledをFalseにしてサイジングする。
- spark.dynamicAllocation.initialExecutors/minExecutors/maxExecutorsを決める。
- 最初は、ドライバーメモリ、エグゼキュータメモリ、および CPU パラメータを自分自身で制御する。
概要
以降の記事は、Amazon EMR で Spark アプリケーションを成功させるための設定を元に整理します。
Sparkの特徴と課題
Apache Spark は、オープンソースで高速な汎用目的のクラスターコンピューティングソフトウェアで、ビッグデータの分散処理で広く利用されています。Apache Spark は、タスクの I/O と実行時間を削減するためにノード全体のメモリで並行コンピューティングを実行することから、クラスターメモリ (RAM) に大きく依存しています。
Spark アプリケーションを成功させるには、データと処理の要件に基づいて Spark アプリケーションを適切に設定することが大切です。デフォルト設定では Spark が利用できるクラスターのリソースのすべてを使用しない場合があり、物理メモリまたは仮想メモリの問題、あるいはその両方が発生する可能性があります。Stackoverflow.com では、この特定のトピックに関連する何千もの質問が提起されています。
- Sparkは、下記の特徴があるとのことです。
- OSSのソフトウェア
- ビッグデータの分散処理向き
- ノード全体のメモリで並行コンピューティングを実行するアーキテクチャ
- メモリに大きく依存する
- メモリの問題が発生する
デフォルト設定または不適切な設定の Spark アプリケーションにおける一般的なメモリ問題
デフォルト設定または不適切な設定により、下記のようなメモリ不足エラーが起きます。
- メモリ不足エラー(Java Heap Space)
WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space
- メモリ不足エラー(物理メモリの超過)
Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits. 12.4 GB of 12.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits. 4.5GB of 3GB physical memory used limits. Consider boosting spark.yarn.executor.memoryOverhead.
- メモリ不足エラー(仮想メモリの超過)
Container killed by YARN for exceeding memory limits. 1.1gb of 1.0gb virtual memory used.Killing container.
- メモリ不足エラー(Executorメモリの超過)
Required executor memory (1024+384 MB) is above the max threshold (896 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb
上記のような、メモリ不足エラーの理由例を下記に記載します。
- Spark エグゼキュータインスタンスの数、エグゼキュータメモリの量、コアの数、または並列性が、大量のデータを処理するために適切に設定されていない場合。
- Spark エグゼキュータの物理メモリが YARN によって割り当てられたメモリを超過する場合。この場合、Spark エグゼキュータインスタンスのメモリとメモリオーバーヘッドの合計が、メモリ集約型の操作を処理するために十分な量ではありません。メモリ集約型の操作には、reduceByKey、groupBy などを使用したキャッシング、シャッフリング、および集約が含まれます。または、Spark エグゼキュータインスタンスのメモリとメモリオーバーヘッドの合計が yarn.scheduler.maximum-allocation-mb で定義されている量を超えている場合もあります。
- ガベージコレクションなどのシステム操作の実行に必要なメモリが Spark エグゼキュータインスタンスにない。
- Sparkのリソースが不適切
- Sparkの物理メモリ>YARNメモリ
- 集約型操作に必要なリソース>Executorメモリ+メモリオーバーヘッド
- GC等のシステム操作に必要なメモリ不足
Amazon EMR で Spark アプリケーションを成功させるための設定
1.アプリケーションのニーズに基づいてインスタンスのタイプと数を判断する
ワークロードにより適切なインスタンスタイプと数を設定する。
EMRクラスタには、3つのタイプがあり、ワークロードにより適切なインスタンスタイプと数を設定します。
- プライマリー(マスタ)
- リソースマネージャー
- コア
- 下記のようなプロセスやデーモン等が実行されます。
- YARNのNodeManager
- HadoopのMapReduce
- SparkのExecutor
- 下記のようなプロセスやデーモン等が実行されます。
- タスク
- コアノードとの違いは、データの保存は行わない。
2.Spark 設定パラメータを決定する
spark.dynamicAllocation.enabledは、spark.dynamicAllocation.initialExecutors/minExecutors/maxExecutorsが数が適切に決定されている場合のみTrueとする。
それ以外は、ドライバーメモリ、エグゼキュータメモリ、および CPU パラメータを自分自身で制御する。
アプリケーション要件に基づいて、前述の追加プロパティを慎重に計算します。Spark アプリケーションをサブミットするとき (spark-submit)、または SparkConf オブジェクト内において、Spark-defaults でこれらのプロパティを適切に設定する。
※画像は、Amazon EMR で Apache Spark アプリケーションのメモリをうまく管理するためのベストプラクティス>2.Spark 設定パラメータを決定するから引用。
下記にSparkのパラメータの意味を解説します。
カテゴリ | パラメータ名 | 説明 |
---|---|---|
Driver | spark.driver.memory | ドライバーのために使用するメモリのサイズ。 |
spark.driver.cores | ドライバーのために使用する仮想コアの数。 | |
Executor | spark.executor.memory | タスクを実行する各エグゼキュータのために使用するメモリのサイズ。 |
spark.executor.cores | 仮想コアの数。 | |
spark.executor.instances | エグゼキュータの数。spark.dynamicAllocation.enabled が true に設定されている場合以外は、このパラメータを設定します。 |
|
その他 | spark.default.parallelism | ユーザーによってパーティションの数が設定されていない場合に、join 、reduceByKey 、および parallelize などの変換によって返された RDD (Resilient Distributed Datasets) 内のパーティションのデフォルト数。 |
このリリースガイドでは、Amazon EMR がどのように Spark パラメータのデフォルト値を設定するかについてのおおまかな情報が提供されています。これらの値は spark-defaults 設定内で、クラスター内のコアインスタンスとタスクインスタンスのタイプに基づいて自動的に設定されます。
クラスター内で利用できるリソースのすべてを使用するには、maximizeResourceAllocation パラメータを true に設定してください。この EMR 固有のオプションは、コアインスタンスグループ内のインスタンスにあるエグゼキュータが利用できる最大のコンピューティングリソースとメモリリソースを計算してから、spark-defaults 設定でこれらのパラメータを設定します。この設定を使っても、ほとんどの場合デフォルトの数は少なく、アプリケーションはクラスターの力を完全に使用しません。例えば、並列性は規模が大きいクラスターのために高くできるにもかかわらず、spark.default.parallelism のデフォルトは利用可能な仮想コアの数の 2 倍に留まります。
YARN 上の Spark は、ワークロードに基づいて Spark アプリケーションに使用されるエグゼキュータの数を動的にスケールします。Amazon EMR のリリースバージョン 4.4.0 以降を使用すると、dynamic allocation がデフォルトで有効化されています (これは Spark ドキュメントで説明されています)。
spark.dynamicAllocation.enabled プロパティの問題は、これがサブプロパティの設定を必要とすることです。サブプロパティの例には、spark.dynamicAllocation.initialExecutors、minExecutors、および maxExecutors などがあります。サブプロパティはほとんどの場合、そして特に複数のアプリケーションを同時に実行する必要があるときに、アプリケーションのクラスターで正しい数のエグゼキュータを使用するために必要になります。サブプロパティの設定には、正確な数を把握するまでに数多くの試行錯誤が必要です。数が正確ではないと、キャパシティーが確保されていても、実際に使われることはありません。これはリソースの浪費、または他のアプリケーションに対するメモリエラーにつながります。
- EMRには、maximizeResourceAllocationというパラメータがあります。
- trueに設定することで下記の動作を行う。
- コアインスタンスグループ内のインスタンスにあるExecutorが利用できる最大コンピューティングリソース、メモリを計算してから、spark-defaultsに、これらのパラメータを設定する。
- しかし、デフォルトの数では少なく、アプリケーションはクラスタの完全に使用できない。
- trueに設定することで下記の動作を行う。
- EMR4.4.0以降を使用した場合、dynamic allocationがデフォルトで有効化されている。
- 有効な場合、Executorの数を動的にスケールする。
-
spark.dynamicAllocation.enabled
で設定- spark.dynamicAllocationには、下記のサブプロパティがあります。
- initialExecutors
- minExecutors
- maxExecutors
- サブプロパティの設定は、適切に設定するまで試行錯誤が必要。 適切に設定されないと、リソースの浪費や、他のアプリケーションに対するメモリエラーにつながる。
- spark.dynamicAllocationには、下記のサブプロパティがあります。
設定例
前提
Amazon S3 に保存されている何千ものファイルに分散された 200 テラバイトのデータを処理するとしましょう。さらに、1 個の r5.12xlarge マスターノードと 19 個の r5.12xlarge コアノードを持つ Amazon EMR クラスターを使ってこれを行うと仮定します。各 r5.12xlarge インスタンスには 48 個の仮想コア (vCPU) と 384 GB の RAM があります。これらすべての計算は、AWS が本番使用向けに推奨する --deploy-mode クラスターのためのものです。
- 200TBのデータを処理。数千のファイルに分散されている
- プライマリーノード(マスターノード)
- r5.12xlarge * 1ノード
- コアノード
- r5.12xlarge * 19ノード
- デプロイモードは、Clusterモード
- r5.12xlargeは、下記のスペックです。
- 48 vCPU
- 384 GB メモリ
パラメータの設定例
Sparkアプリケーション実行に必要な、基本的なパラメータと、設定例を記載します。
# | パラメータ(計算項目) | 設定値 | 説明 |
---|---|---|---|
1 | spark.executor.cores | - | 5個の仮想コア固定 |
2 | (Number of executors per instance) | (total number of virtual cores per instance - 1)/ spark.executors.cores ■ 設定例 (48vCPU - 1vCPU)/ 5 core = 47 / 5 = 9 (rounded down) |
1 インスタンスあたりのエグゼキュータの数 Hadoop デーモン用に確保しておくため、仮想コアの合計数から仮想コアを 1 個差し引きます。 |
3 | (Total executor memory) | total RAM per instance / number of executors per instance ■ 設定例 (384 GB -1 GBメモリ) / 9 Executors = 42 (rounded down) |
1インスタンスあたりの合計 RAMと 1 インスタンスあたりのエグゼキュータの数を使って、エグゼキュータメモリの合計容量を求めます。 Hadoop デーモン用に 1 GB を残しておいてください。 |
4 | spark.yarn.executor.memoryOverhead | total executor memory (上記の#3の結果) * 0.10 ■ 設定例 42 * 0.1 = 5 (rounded down) |
エグゼキュータメモリの合計容量の 10 パーセントをメモリオーバーヘッドに割り当てる。 |
5 | spark.executor.memory | total executor memory (上記の#3の結果) * 0.90 ■ 設定例 42 * 0.9 = 37 (rounded down) |
90 パーセントをエグゼキュータメモリに割り当てる。 |
6 | spark.driver.cores | spark.driver.cores= spark.executors.cores. | spark.executors.cores と等しくなるように設定することをお勧めします。 |
7 | spark.driver.memory | spark.driver.memory = spark.executors.memory | spark.executors.memory と等しくなるように設定することをお勧めします。 |
8 | spark.executor.instances | (number of executors per instance (上記の#2の結果) * number of core instances) minus 1 for the driver ■ 設定例 (9 * 19) - 1 = 170 |
エグゼキュータの数とインスタンスの合計数を乗じて計算します。ドライバー用にエグゼキュータを 1 個残しておいてください。 |
9 | spark.default.parallelism | spark.executor.instances (上記の#8の結果) * spark.executors.cores (上記の#1の結果) * 2 ■ 設定例 170 * 5 * 2 = 1,700 |
この計算の結果は 1,700 個のパーティションになるが、各パーティションのサイズを見積り、coalesce または repartition を使ってこの数値を適切に調整を推奨とのこと。 データフレームの場合は、 spark.default.parallelism と共に spark.sql.shuffle.partitions パラメータを設定します。 |
その他のパラメータについても記載します。
パラメータ | 説明 |
---|---|
spark.network.timeout | すべてのネットワークトランザクションのタイムアウト。 |
spark.executor.heartbeatInterval | ドライバーに対する各エグゼキュータのハートビートの間隔 ※ この値は、spark.network.timeout よりも大幅に少ない値である必要があるとのこと。 |
spark.memory.fraction | Spark の実行とストレージに使用される JVM ヒープ領域の割合。 この値が低いほど、スピルとキャッシュデータの削除がより頻繁に発生します。 |
spark.memory.storageFraction | spark.memory.fraction によって確保されたリージョンのサイズの一部として表されます。 この値が高いほど、実行用に利用できる作業メモリが少なくなる可能性があります。これは、タスクがディスクにより頻繁にスピルする可能性があることを意味します。 |
spark.yarn.scheduler.reporterThread.maxFailures | YARN がアプリケーションを失敗させるまでに許容されるエグゼキュータの最大失敗回数。 |
spark.rdd.compress | true に設定すると、このプロパティは RDD を圧縮することによって、追加の CPU 時間を代償にかなりの領域を節約することができます。 |
spark.shuffle.compress | true に設定すると、このプロパティはマップ出力を圧縮して領域を節約します。 |
spark.shuffle.spill.compress | true に設定すると、このプロパティはシャッフル中にスピルされたデータを圧縮します。 |
spark.sql.shuffle.partitions | 結合と集約のためのパーティションの数を設定します。 |
spark.serializer | データをシリアライズまたはデシリアライズするシリアライザーを設定します。 シリアライザーとしては、Kyro (org.apache.spark.serializer.KryoSerializer) がよい。Java のデフォルトシリアライザーよりも高速でコンパクト。 |
3.適切なガベージコレクターを実装してメモリを効率的にクリアする
ガベージコレクションは、特定のケースでメモリ不足エラーを引き起こす場合があります。これには、アプリケーション内に複数の大規模な RDD が存在するケースが含まれます。他のケースは、タスク実行メモリと RDD キャッシュメモリの間に干渉がある場合に発生します。
- 下記のような場合に、GCによりメモリ不足を引き起こす場合がある。
- アプリ内に大規模なRDDが存在する場合
- タスク実行メモリと、RDDキャッシュメモリの間に干渉がある場合
古いオブジェクトをエビクトして、メモリ内に新しいオブジェクトを設置するために、複数のガベージコレクターを使用することができます。ただし、最新の Garbage First Garbage Collector (G1GC) は、古いガベージコレクターのレイテンシーとスループットの制限を克服します。
```
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
```
- G1GCを使用するのがよい。
4.YARN 設定パラメータを設定する
すべての Spark 設定プロパティが正しく計算され、設定されているとしても、OS によって仮想メモリが積極的に増やされるときに、まれではありますが、仮想メモリ不足エラーが生じる可能性があります。これらのアプリケーションの失敗を防ぐには、YARN サイト設定に以下のフラグを設定します。
```
"yarn.nodemanager.vmem-check-enabled":"false",
"yarn.nodemanager.pmem-check-enabled":"false"
```
- OSにより仮想メモリ不足エラーが引き起こされる可能性がある。
5.デバッグとモニタリングを実行する
Spark 設定オプションがどこからきているかの詳細を得るには、–verbose オプションを付けて spark-submit を実行することができます。また、Ganglia および Spark UI を使ってアプリケーションの進捗状況、クラスターの RAM 使用率、ネットワーク I/O などを監視することができます。
- Ganglia、Spark UIを使って監視・確認する
実践
下記のようなシナリオで、挙動を確認します。
- EMRを起動した状態を確認
- OOM
EMRを起動した状態を確認
emr-6.10.0を起動した状態の設定を確認します。
デフォルトでは、推奨されるパラメータの設定はされていません。
-
クラスタのインスタンスグループの内容は下記のとおりです。
- プライマリグループ
- m6a.xlarge
- 4 vCore、16 GiB メモリ, EBS
- m6a.xlarge
- コアグループ
- m6a.xlarge
- 4 vCore、16 GiB メモリ, EBS
- m6a.xlarge
- プライマリグループ
-
maximizeResourceAllocationを確認します。
false
でした。$ aws emr describe-cluster --cluster-id j-XXXXXXXX --region us-west-2 | grep -10 maximizeResourceAllocation } ], "Tags": [], "ServiceRole": "arn:aws:iam::xxxxxx:role/EMR_DefaultRole", "NormalizedInstanceHours": 16, "MasterPublicDnsName": "xxxxxxxxxx.us-west-2.compute.internal", "Configurations": [ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "false" } }, { "Classification": "spark-hive-site", "Properties": { "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" } } ], "ScaleDownBehavior": "TERMINATE_AT_TASK_COMPLETION",
-
EMR Studioで設定を確認します。
from pyspark.sql import SparkSession session = SparkSession \ .builder \ .appName("Python example") \ .getOrCreate() sc = session.sparkContext conf = sc.getConf()
print("spark.dynamicAllocation.enabled:[{0}]".format(conf.get("spark.dynamicAllocation.enabled"))) print("spark.dynamicAllocation.initialExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.initialExecutors"))) print("spark.dynamicAllocation.minExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.minExecutors"))) print("spark.dynamicAllocation.maxExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.maxExecutors"))) print("spark.executor.cores:[{0}]".format(conf.get("spark.executor.cores"))) print("spark.yarn.executor.memoryOverhead:[{0}]".format(conf.get("spark.yarn.executor.memoryOverhead"))) print("spark.executor.memory:[{0}]".format(conf.get("spark.executor.memory"))) print("spark.driver.cores:[{0}]".format(conf.get("spark.driver.cores"))) print("spark.driver.memory:[{0}]".format(conf.get("spark.driver.memory"))) print("spark.executor.instances:[{0}]".format(conf.get("spark.executor.instances"))) print("spark.default.parallelism:[{0}]".format(conf.get("spark.default.parallelism"))) print("spark.network.timeout:[{0}]".format(conf.get("spark.network.timeout"))) print("spark.executor.heartbeatInterval:[{0}]".format(conf.get("spark.executor.heartbeatInterval"))) print("spark.memory.fraction:[{0}]".format(conf.get("spark.memory.fraction"))) print("spark.memory.storageFraction:[{0}]".format(conf.get("spark.memory.storageFraction"))) print("spark.yarn.scheduler.reporterThread.maxFailures:[{0}]".format(conf.get("spark.yarn.scheduler.reporterThread.maxFailures"))) print("spark.rdd.compress:[{0}]".format(conf.get("spark.rdd.compress"))) print("spark.shuffle.compress:[{0}]".format(conf.get("spark.shuffle.compress"))) print("spark.shuffle.spill.compress:[{0}]".format(conf.get("spark.shuffle.spill.compress"))) print("spark.sql.shuffle.partitions:[{0}]".format(conf.get("spark.sql.shuffle.partitions"))) print("spark.serializer:[{0}]".format(conf.get("spark.serializer"))) === spark.dynamicAllocation.enabled:[true] spark.dynamicAllocation.initialExecutors:[None] spark.dynamicAllocation.minExecutors:[None] spark.dynamicAllocation.maxExecutors:[None] spark.executor.cores:[2] spark.yarn.executor.memoryOverhead:[None] spark.executor.memory:[4921M] spark.driver.cores:[None] spark.driver.memory:[2048M] spark.executor.instances:[None] spark.default.parallelism:[None] spark.network.timeout:[None] spark.executor.heartbeatInterval:[None] spark.memory.fraction:[None] spark.memory.storageFraction:[None] spark.yarn.scheduler.reporterThread.maxFailures:[None] spark.rdd.compress:[None] spark.shuffle.compress:[None] spark.shuffle.spill.compress:[None] spark.sql.shuffle.partitions:[None] spark.serializer:[None]
パラメータ 設定値 備考 spark.dynamicAllocation.enabled true spark.dynamicAllocation.initialExecutors None spark.dynamicAllocation.minExecutors None spark.dynamicAllocation.maxExecutors None spark.executor.cores 2 "5"という想定だが、想定より少ない。 spark.yarn.executor.memoryOverhead None spark.executor.memory 4921M spark.driver.cores None spark.driver.memory 2048M spark.executors.memoryという同一という想定だが異なる。 spark.executor.instances None spark.default.parallelism None spark.network.timeout None spark.executor.heartbeatInterval None spark.memory.fraction None spark.memory.storageFraction None spark.yarn.scheduler.reporterThread.maxFailures None spark.rdd.compress None spark.shuffle.compress None spark.shuffle.spill.compress None spark.sql.shuffle.partitions None spark.serializer None -
YARNのTimeline ServerのUIから
yarn.scheduler.maximum-allocation-mb
を確認します。8129MBでした。<property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>8192</value> <final>false</final> <source>yarn-default.xml</source> </property>
Executorメモリ < ファイルサイズ
ファイルサイズよりもExecutorメモリを小さくしてみます。
-
Executor(spark.executor.memory)のメモリを 1GiB(1024M)に設定してみます。ちなみにベストプラクティスに沿った場合、下記の設定になります。
- m6a.xlargeインスタンス
- 4 vCore
- 16 GiB メモリ
- Number of executors per instance = 4 vCore -1 vCore / 2 = 1
- Total executor memory = 16GiB - 1GiB / 1 = 15GiB
- spark.executor.memory = 15GiB * 0.9 = 13.5 ≒ 13 GiB
- spark.yarn.executor.memoryOverhead = 15 GiB * 0.1 = 1.5 GiB ≒ 1 GiB
-
下記のパラメータを設定してEMRクラスタを作成します。
[ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "false" } }, { "Classification": "spark-defaults", "Properties": { "spark.executor.memory": "1024M" } } ]
-
設定値を確認します。設定した通り、spark.executor.memoryは1024M(1GiB)になりました。
from pyspark.sql import SparkSession session = SparkSession \ .builder \ .appName("Python example") \ .getOrCreate() sc = session.sparkContext conf = sc.getConf() print("spark.dynamicAllocation.enabled:[{0}]".format(conf.get("spark.dynamicAllocation.enabled"))) print("spark.dynamicAllocation.initialExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.initialExecutors"))) print("spark.dynamicAllocation.minExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.minExecutors"))) print("spark.dynamicAllocation.maxExecutors:[{0}]".format(conf.get("spark.dynamicAllocation.maxExecutors"))) print("spark.executor.cores:[{0}]".format(conf.get("spark.executor.cores"))) print("spark.yarn.executor.memoryOverhead:[{0}]".format(conf.get("spark.yarn.executor.memoryOverhead"))) print("spark.executor.memory:[{0}]".format(conf.get("spark.executor.memory"))) print("spark.driver.cores:[{0}]".format(conf.get("spark.driver.cores"))) print("spark.driver.memory:[{0}]".format(conf.get("spark.driver.memory"))) print("spark.executor.instances:[{0}]".format(conf.get("spark.executor.instances"))) print("spark.default.parallelism:[{0}]".format(conf.get("spark.default.parallelism"))) print("spark.network.timeout:[{0}]".format(conf.get("spark.network.timeout"))) print("spark.executor.heartbeatInterval:[{0}]".format(conf.get("spark.executor.heartbeatInterval"))) print("spark.memory.fraction:[{0}]".format(conf.get("spark.memory.fraction"))) print("spark.memory.storageFraction:[{0}]".format(conf.get("spark.memory.storageFraction"))) print("spark.yarn.scheduler.reporterThread.maxFailures:[{0}]".format(conf.get("spark.yarn.scheduler.reporterThread.maxFailures"))) print("spark.rdd.compress:[{0}]".format(conf.get("spark.rdd.compress"))) print("spark.shuffle.compress:[{0}]".format(conf.get("spark.shuffle.compress"))) print("spark.shuffle.spill.compress:[{0}]".format(conf.get("spark.shuffle.spill.compress"))) print("spark.sql.shuffle.partitions:[{0}]".format(conf.get("spark.sql.shuffle.partitions"))) print("spark.serializer:[{0}]".format(conf.get("spark.serializer"))) print("yarn.scheduler.maximum-allocation-mb:[{0}]".format(conf.get("yarn.scheduler.maximum-allocation-mb"))) === spark.dynamicAllocation.enabled:[true] spark.dynamicAllocation.initialExecutors:[None] spark.dynamicAllocation.minExecutors:[None] spark.dynamicAllocation.maxExecutors:[None] spark.executor.cores:[2] spark.yarn.executor.memoryOverhead:[None] spark.executor.memory:[1024M] spark.driver.cores:[None] spark.driver.memory:[2048M] spark.executor.instances:[None] spark.default.parallelism:[None] spark.network.timeout:[None] spark.executor.heartbeatInterval:[None] spark.memory.fraction:[None] spark.memory.storageFraction:[None] spark.yarn.scheduler.reporterThread.maxFailures:[None] spark.rdd.compress:[None] spark.shuffle.compress:[None] spark.shuffle.spill.compress:[None] spark.sql.shuffle.partitions:[None] spark.serializer:[None] yarn.scheduler.maximum-allocation-mb:[None]
-
パーティション分割せずに、1つのタスクで読み込みます。
spark.conf.set('spark.sql.files.maxPartitionBytes','12884901888') df3 = spark.read.json("s3://XXXXX/input/json/largedata/one-file/20230528-3/") df4 = df3.groupBy("group").count() df4.printSchema() df4.count()
-
成功してしまいました。想定ではエラーになると思っていました。
root |-- group: string (nullable = true) |-- count: long (nullable = false) 10
考察
今回は、主に、Sparkのメモリについての設定を学びました。
実践では、メモリの設定値を変更してエラーになるケースを試してみましたが、想定通りうまくいきませんでした。
今後、引き続きチューニングを行い、効果を検証してみようと思います。
参考