背景・目的
- S3上に100KB程度のファイルが、数千程度あった。このファイルをAthenaで参照したいがAmazon Athena のパフォーマンスチューニング Tips トップ 10(4. ファイルサイズを最適化する)(※1)から逸脱していた。
- 本ページでは、Glueで128MB以上のファイルを作成する(まとめる)方法を検証する。
※1
データ読み込みが並列で行われ、データブロックがシーケンシャルに読み込まれる場合に、クエリが効率的に実行されます。分割可能なファイルフォーマットであるようにしておくことで、ファイルの大きさに関わらず並列処理が行われます。
ただしファイルサイズが非常に小さい場合、特に 128MB 未満の場合には、実行エンジンは S3ファイルのオープン、ディレクトリのリスト表示、オブジェクトメタデータの取得、データ転送のセットアップ、ファイルヘッダーの読み込み、圧縮ディレクトリの読み込み、といった処理に余分な時間がかかります。その一方で、ファイルが分割不可能でサイズが非常に大きいときには、単一のリーダーがファイル全体の読み込みを完了するまで、クエリ自体の処理は行われません。この場合、並列性が下がってしまいます。
結論
- ファイルサイズを大きくするには、入力ファイルのグループ化と、出力パーティション数の調整が必要。
内容
インプットサイズを調整
-
大きなグループの入力ファイルの読み取りを参考に、読み込むファイルを一定以上の大きさでグループ化する。
-
グループ化するには、以下のパラメータをcreate_dynamic_frame関数のoptionsに設定する。
キー | 値 |
---|---|
groupFiles | inPartition |
groupSize | サイズ(バイト) |
- 以下に例を記載する。
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={
"paths": [
"s3://バケット名/パス/"
],
"recurse": True,
'groupFiles': 'inPartition',
'groupSize': '268435456'
},
transformation_ctx="S3bucket_node1",
)
- これにより、ファイルあたり、80MB〜100MB程度までまとめることができたが、128MB以上にならなかったので更に検討する。
出力パーティション数を調整する。
- coalesce() を使用して出力パーティション数を減らす。
- coalesce(出力パーティション数)でパーティション数が決まる。
- パーティション数を減らしすぎるとファイルサイズが大きくなりすぎる。また、GlueジョブでOOMや処理時間を要するなど、副作用があるので注意が必要。
- 適度なパーティション数に変更する方法として、Dynamic Frame の coalesce() でパーティション数を減らす
を参考に定数で割る方法を試してみた。以下に例を記載する。
current_partition_num = ApplyMapping_node2.getNumPartitions()
# パーティション数を半分に変更する。
custom_partition_num = int( current_partition_num / 2)
dynamic_frame = ApplyMapping_node2.coalesce(custom_partition_num)
結果
- 128MB以上になり、ファイル数もかなり減らすことができました。
コード全体
- コードを載せます。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={
"paths": [
"s3://バケット名/パス/"
],
"recurse": True,
'groupFiles': 'inPartition',
'groupSize': '268435456'
},
transformation_ctx="S3bucket_node1",
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("column1", "string", "column1", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
current_partition_num = ApplyMapping_node2.getNumPartitions()
# パーティション数を半分に変更する。
custom_partition_num = int( current_partition_num / 2)
dynamic_frame = ApplyMapping_node2.coalesce(custom_partition_num)
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=dynamic_frame,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://バケット名/パス名/",
"partitionKeys": [],
},
format_options={"compression": "snappy"},
transformation_ctx="S3bucket_node3",
)
job.commit()
考察
- インプットファイルのグループ化だけでは不十分。出力パーティションの調整も必要。
- GlueのDynamicFrameの関数やオプションを学ぶことができた。まだまだ知らないことが多いので継続的な学習が必要。
- 今後も課題ベースでの学習を継続的に実施する。
参考