やりたいこと
この図のように、S3上のTableAというフォルダ配下にyear=xxxx
というHive形式のパーティションパスが存在し、それが既にカタログ化されています。
そのパーティションパス配下にある複数のファイルを、1つのファイルに集約したい状況です。
集約したい理由
このデータカタログを読み取る際の性能のためです。
GlueでデータをS3上に出力すると、基本的には複数のファイルに分割されて出力されます。これは、Sparkが分散処理しているためです。
もし小さいサイズのデータを複数のファイルに分割して出力した場合、1つ1つのファイルサイズは非常に小さくなってしまいます。
Athenaでデータを読み込む際のTipsの「4.ファイルサイズを最適化する」に記載の通り、ファイルサイズが非常に小さい場合は読み取りの性能が下がってしまいます。
ただしファイルサイズが非常に小さい場合、特に 128MB 未満の場合には、実行エンジンは S3ファイルのオープン、ディレクトリのリスト表示、オブジェクトメタデータの取得、データ転送のセットアップ、ファイルヘッダーの読み込み、圧縮ディレクトリの読み込み、といった処理に余分な時間がかかります。その一方で、ファイルが分割不可能でサイズが非常に大きいときには、単一のリーダーがファイル全体の読み込みを完了するまで、クエリ自体の処理は行われません。この場合、並列性が下がってしまいます。
前提条件
Glue version3.0を利用しています。
Sparkのversionは3.1.1です。
やったこと
ファイルを集約するようなGlueジョブを作成しました。
作成したGlueジョブのコード
データベース名やテーブル名などは「# 変数設定」で、定義しています。適宜変更してください。
ポイントは、コメントにもある以下3つです。
- ①パーティションの集約
- ②キャッシュに永続化
- ③アクション実行
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# 変数設定
database_name = "mydb"
table_name = "TableA"
partition_col_list = ["year"]
output_s3_path = "s3://sample-bucket/TableA/"
# カタログからロード
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database=database_name,
table_name=table_name,
transformation_ctx="S3bucket_node1",
)
# ポイント①:パーティションの集約
S3bucket_node2 = S3bucket_node1.coalesce(1)
# ポイント②:キャッシュに永続化
df = S3bucket_node2.toDF()
cachedDf = df.persist()
# ポイント③:アクション実行
cachedDf.count()
# S3に上書き
cachedDf.write.mode("overwrite").partitionBy(partition_col_list).parquet(output_s3_path)
job.commit()
①パーティションの集約
# ポイント①:パーティションの集約
S3bucket_node2 = S3bucket_node1.coalesce(1)
ここでは、パーティションを1つにしてそのパーティションにデータを集約しています。
Sparkでデータを出力する際は、1パーティションにつき1ファイルが出力されます。
そのため、coalesce(1)を実施することでパーティション数を1つにして、強制的に1ファイルの出力にしています。
coalesce()については、以下のBlackBeltのp.41に載っています。
また、パーティションについては以下の記事が詳しいです。
なぜそのままデータを出力してはいけないか?
①パーティションの集約を実施したら、そのままDataFrameに変換してoverwriteでデータを書き込めば良いのでは?と思われた方もいると思います。つまり、以下のようなプログラムです(ライブラリインポートや変数定義など省いています)。
# カタログからロード
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database=database_name,
table_name=table_name,
transformation_ctx="S3bucket_node1",
)
# ポイント①:パーティションの集約
S3bucket_node2 = S3bucket_node1.coalesce(1)
# S3に上書き
df = S3bucket_node2.toDF()
df.write.mode("overwrite").partitionBy(partition_col_list).parquet(output_s3_path)
このジョブを実行すると、以下のようなエラーが発生します。
また、元データであるS3上のデータが削除されてしまっています。
An error occurred while calling o104.parquet. No such file or directory 's3://xxx/yyy.snappy.parquet'
エラーから推測するに、書き込みの際にデータの削除までは実行されて、そこから再度データを読み込んで書き込もうとしていると考えられます。おそらくSparkの遅延評価(アクションが実行されるまで処理が実行されない)が関わっていそうです。
そのため、方針として、データが削除されても良いようにキャッシュに永続化をして、さらにその永続化が実行されるように何かアクションを実行することを考えます。
②キャッシュに永続化
# ポイント②:キャッシュに永続化
df = S3bucket_node2.toDF()
cachedDf = df.persist()
ここでは、persist()を実施してDataFrameのデータをメモリに保存しておくことができます。
「なぜそのままデータを出力してはいけないか?」にも記載しましたが、Sparkは遅延評価されるためプログラムの上から順次実行されていくわけではありません。アクション処理が記載された行に来て、初めてそこまでのコードが実行されます。そのため、後でそのデータを使いたいと思っても、また最初から実行しなおされてしまいます。
そうしないために、このpersist()を呼ぶことでメモリ上にデータを保存しておくことができます。あとでこの永続化したデータを呼び出した時は、最初から順次実行されるのではなく、代わりにメモリに保存してあるデータをそのまま利用できます。
遅延評価やpersistについては、以下の記事とBlackBeltが詳しいです。
③アクション実行
# ポイント③:アクション実行
cachedDf.count()
最後のポイントです。coalesce(1)やpersist()はアクションではなくトランスフォーメーション処理であるため、ここまでの処理を実行するためにアクションを実施します。ここでアクションを実行することで、パーティションが1つになってそのデータがメモリに永続化されます。
今回はcount()にしましたが、それ以外のアクション処理でも大丈夫です。
アクションの例は、先ほど載せたBlackBeltのp.16に以下のような記載があります。
注意点
1. S3とデータカタログの同期処理は実施していない
通常、S3のデータを更新したらデータカタログもそれに合わせて更新します。
その方法はクローラーを実行する方法や、Glueジョブ内で更新を実行する方法など色々ありますが、今回はそれを実施していません。
ただ、今回はそれを実施しなくても特に影響はないです。
なぜなら、今回はあくまでS3上のファイルの形式を変更しているにすぎず、データの内容については特に変更しないからです。そのため、このジョブを実施することでデータの内容が変更されたり、データが増えたりすることはないので、特にデータカタログとの同期処理は不要です。
2. coalesce(1)とパーティション数を1つに指定している
今回はcoalesce(1)でパーティションを集約していますが、これはファイルのサイズがそこまで大きくないことを前提としています。
ファイルのサイズが大きいと、そもそも1つのExecutorの容量を超えてしまってジョブがエラーになったり、ジョブエラーにはならなくとも1つのファイルサイズがベストプラクティスの128MBを超えてしまったりする可能性があります。
そのため、ファイルサイズが大きくなる場合には、coalesce(n)として後からnの数を変えるようにしたり(ジョブパラメーターに設定しておくと良い)、S3のファイルサイズを読み込んで動的に最適な分割数を決めるロジックを組み込んだりする必要があります。
以下は、後者の動的に分割数を決めるやり方の参考になります。
まとめと感想
今回は、データカタログ化されているS3上のファイルの数を1つに集約する方法を述べました。
ファイル数を1つにするためにはcoalesce()を使ってパーティションを集約し、元のデータを上書きするためにはpersist()でメモリ上にデータを永続化して、それを実行するためのアクションを実施しました。
今回の方法ではデータカタログの同期はしていませんがデータの更新はないため特に問題はなさそうです。また、データのサイズが大きくなってきたときはcoalesce()で指定するパーティションの分割数を検討する必要があります。
ファイル数の集約はもっと簡単にできるかと思いましたが、遅延評価などのSpark特有の部分に引っかかり結構ハマりました。ただ、理解は深まった気がします。