背景・目的
Glueでエラー時のデバッグと対応を試してみます。
本ページは、ドキュメントの「OOM 例外とジョブの異常のデバッグ」を元に実践してみます。
まとめ
多くのファイルを扱う場合、それぞれタスクごとにアサインされ、それらのタスクをトレースするためにDriverで多くのメモリを使用します。
(Driverのメモリの仕様状況は、Glueの画面で確認ができます。)
多くのファイルを取り扱うことでDriverのメモリの使用率が高くなっている場合には、読み込むファイルをグループ化する。
これにより、メモリ使用率が下がる可能性があります。
実装
ドライバー OOM 例外のデバッグ
本シナリオは、Sparkジョブで多数の小さいファイルを読み込んだ際に、Sparkドライバのメモリ不足によるエラーが発生するシナリオを試します。
S3上の100万ファイルを読み込み、書き出すだけのプログラムで試します。
前提
100万以上のファイルを用意しておきます。ここでは、618万のファイルを用意しました。
- ファイルサイズ 1.2GiB
- ファイル数 618.7万
- 1ファイルあたりのサイズ 208バイト(1.2GiB / 618.7万)
$ aws s3 ls {S3パス} --sum --human | grep Total &
[1] 66794 66795
$ Total Objects: 6187801
Total Size: 1.2 GiB
$
コード
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
INPUT_PATH = "s3://{入力}"
OUTPUT_PATH = "s3://{出力}"
data = spark.read.format("json").option("inferSchema", False).load("{0}".format(INPUT_PATH))
data.write.format("parquet").save("{0}".format(OUTPUT_PATH))
job.commit()
実行
Spark Driverは全てのディレクトリのすべてのファイルのリストの取得を試みて、InMemoryFileIndexを構築し、ファイルごとに1つのタスクを起動する。この結果、全てのタスクを追跡するために、大量のステータスをメモリに保存する必要が生じるため、DriverでOOMが発生するとのこと。
- Workersを100に増やして、実行します。
- 30分程度でFailedになりました。
- Glueコンソールでメトリクスを確認すると、DriverのMemory Usageが100%近く達していました。
グループ化を使用して複数のファイルの処理を修正する。
読み込みファイルのグループ化により、複数ファイルを1つのグループにまとめることができる。
タスクは単一のファイルではなく、グループ全体を処理できる様になる。その結果、Sparkドライバーがメモリに保存するステータスが減り、追跡するタスクが減少する。
-
コードを下記のように書き換えて実行します。
- ファイルをグループ化する → groupFilesをinPartitionに
# data = spark.read.format("json").option("inferSchema", False).load("{0}".format(INPUT_PATH)) # data.write.format("parquet").save("{0}".format(OUTPUT_PATH)) df = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["{0}".format(INPUT_PATH)], "recurse":True, 'groupFiles': 'inPartition'}, format="json") datasink = glueContext.write_dynamic_frame.from_options(frame = df, connection_type = "s3", connection_options = {"path": "{0}".format(OUTPUT_PATH)}, format = "parquet", transformation_ctx = "datasink")
考察
細かいファイルをトレースすることにより、Driverのメモリがボトルネックになる可能性があります。
読み込むファイルをグループ化することで、解消する可能性があります。
今後は、ExecutorのOOMについても確認する予定です。
まとめ