1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Glueのジョブのモニタリングとデバッグ(OOM例外とジョブの異常のデバッグ-Driver)を試してみた

Posted at

背景・目的

Glueでエラー時のデバッグと対応を試してみます。
本ページは、ドキュメントの「OOM 例外とジョブの異常のデバッグ」を元に実践してみます。

まとめ

多くのファイルを扱う場合、それぞれタスクごとにアサインされ、それらのタスクをトレースするためにDriverで多くのメモリを使用します。
(Driverのメモリの仕様状況は、Glueの画面で確認ができます。)
多くのファイルを取り扱うことでDriverのメモリの使用率が高くなっている場合には、読み込むファイルをグループ化する。
これにより、メモリ使用率が下がる可能性があります。

実装

ドライバー OOM 例外のデバッグ

本シナリオは、Sparkジョブで多数の小さいファイルを読み込んだ際に、Sparkドライバのメモリ不足によるエラーが発生するシナリオを試します。
S3上の100万ファイルを読み込み、書き出すだけのプログラムで試します。

前提

100万以上のファイルを用意しておきます。ここでは、618万のファイルを用意しました。

  • ファイルサイズ 1.2GiB
  • ファイル数 618.7万
  • 1ファイルあたりのサイズ 208バイト(1.2GiB / 618.7万)
$ aws s3 ls {S3パス} --sum  --human  | grep Total &
[1] 66794 66795
$ Total Objects: 6187801
   Total Size: 1.2 GiB
$

コード

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

INPUT_PATH  = "s3://{入力}"
OUTPUT_PATH = "s3://{出力}"

data = spark.read.format("json").option("inferSchema", False).load("{0}".format(INPUT_PATH))
data.write.format("parquet").save("{0}".format(OUTPUT_PATH))

job.commit()

実行

Spark Driverは全てのディレクトリのすべてのファイルのリストの取得を試みて、InMemoryFileIndexを構築し、ファイルごとに1つのタスクを起動する。この結果、全てのタスクを追跡するために、大量のステータスをメモリに保存する必要が生じるため、DriverでOOMが発生するとのこと。

  1. Workersを100に増やして、実行します。
  2. 30分程度でFailedになりました。
    image.png
  3. Glueコンソールでメトリクスを確認すると、DriverのMemory Usageが100%近く達していました。
    image.png

グループ化を使用して複数のファイルの処理を修正する。

読み込みファイルのグループ化により、複数ファイルを1つのグループにまとめることができる。
タスクは単一のファイルではなく、グループ全体を処理できる様になる。その結果、Sparkドライバーがメモリに保存するステータスが減り、追跡するタスクが減少する。

  1. コードを下記のように書き換えて実行します。

    • ファイルをグループ化する → groupFilesをinPartitionに
    # data = spark.read.format("json").option("inferSchema", False).load("{0}".format(INPUT_PATH))
    # data.write.format("parquet").save("{0}".format(OUTPUT_PATH))
    df = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["{0}".format(INPUT_PATH)], "recurse":True, 'groupFiles': 'inPartition'}, format="json")
    datasink = glueContext.write_dynamic_frame.from_options(frame = df, connection_type = "s3", connection_options = {"path": "{0}".format(OUTPUT_PATH)}, format = "parquet", transformation_ctx = "datasink")  
    
  2. Glueコンソールでメトリクスを確認すると、DriverのMemory Usageが80%以内に収まっていました。
    image.png

考察

細かいファイルをトレースすることにより、Driverのメモリがボトルネックになる可能性があります。
読み込むファイルをグループ化することで、解消する可能性があります。
今後は、ExecutorのOOMについても確認する予定です。

まとめ

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?