1
1

More than 1 year has passed since last update.

Glueでファイルをまとめる方法について

Last updated at Posted at 2021-12-22

背景・目的

※1

データ読み込みが並列で行われ、データブロックがシーケンシャルに読み込まれる場合に、クエリが効率的に実行されます。分割可能なファイルフォーマットであるようにしておくことで、ファイルの大きさに関わらず並列処理が行われます。

ただしファイルサイズが非常に小さい場合、特に 128MB 未満の場合には、実行エンジンは S3ファイルのオープン、ディレクトリのリスト表示、オブジェクトメタデータの取得、データ転送のセットアップ、ファイルヘッダーの読み込み、圧縮ディレクトリの読み込み、といった処理に余分な時間がかかります。その一方で、ファイルが分割不可能でサイズが非常に大きいときには、単一のリーダーがファイル全体の読み込みを完了するまで、クエリ自体の処理は行われません。この場合、並列性が下がってしまいます。

結論

  • ファイルサイズを大きくするには、入力ファイルのグループ化と、出力パーティション数の調整が必要。

内容

インプットサイズを調整

キー
groupFiles inPartition
groupSize サイズ(バイト)
  • 以下に例を記載する。
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": [
            "s3://バケット名/パス/"
        ],
        "recurse": True,
        'groupFiles': 'inPartition',
        'groupSize': '268435456'

    },
    transformation_ctx="S3bucket_node1",
)

  • これにより、ファイルあたり、80MB〜100MB程度までまとめることができたが、128MB以上にならなかったので更に検討する。

出力パーティション数を調整する。

  • coalesce() を使用して出力パーティション数を減らす。
  • coalesce(出力パーティション数)でパーティション数が決まる。
  • パーティション数を減らしすぎるとファイルサイズが大きくなりすぎる。また、GlueジョブでOOMや処理時間を要するなど、副作用があるので注意が必要。
  • 適度なパーティション数に変更する方法として、Dynamic Frame の coalesce() でパーティション数を減らす を参考に定数で割る方法を試してみた。以下に例を記載する。
current_partition_num = ApplyMapping_node2.getNumPartitions()

# パーティション数を半分に変更する。
custom_partition_num = int( current_partition_num / 2)

dynamic_frame = ApplyMapping_node2.coalesce(custom_partition_num)

結果

  • 128MB以上になり、ファイル数もかなり減らすことができました。

image.png

コード全体

  • コードを載せます。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": [
            "s3://バケット名/パス/"
        ],
        "recurse": True,
        'groupFiles': 'inPartition',
        'groupSize': '268435456'

    },
    transformation_ctx="S3bucket_node1",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("column1", "string", "column1", "string"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

current_partition_num = ApplyMapping_node2.getNumPartitions()

# パーティション数を半分に変更する。
custom_partition_num = int( current_partition_num / 2)

dynamic_frame = ApplyMapping_node2.coalesce(custom_partition_num)


# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=dynamic_frame,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://バケット名/パス名/",
        "partitionKeys": [],
    },
    format_options={"compression": "snappy"},
    transformation_ctx="S3bucket_node3",
)

job.commit()

考察

  • インプットファイルのグループ化だけでは不十分。出力パーティションの調整も必要。
  • GlueのDynamicFrameの関数やオプションを学ぶことができた。まだまだ知らないことが多いので継続的な学習が必要。
  • 今後も課題ベースでの学習を継続的に実施する。

参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1