Glue Jobには新たに追加されたデータだけを処理するブックマークの機能があります。前回実行時にどこまで処理したかを記憶しておいて、次に実行するときに追加されたファイルのみを処理できます。
ジョブのブックマークを使用した処理済みデータの追跡 - AWS Glue
Amazon S3 入力ソースの場合、AWS Glue ジョブのブックマークではオブジェクトの最終更新日時を確認して、どのオブジェクトを再処理する必要があるのかを確認します。入力ソースデータが最後のジョブ実行以降に変更されている場合、ジョブを再度実行すると、ファイルが再度処理されます。
と書かれています。
S3のファイル名やパーティション関係なくタイムスタンプだけを見るので、過去のパーティションにファイルが追加されてもGlueはそれを検知して処理します。
また、同じファイル名で上書き更新されたファイルがあった場合も、Glueはそれを見つけ出して処理されます。書き出し先がS3の場合は処理された分がそのまま追加されますので、同じファイル名で同じ中身でタイムスタンプだけ更新されると、書き出し先に同じ内容のファイルが重複することになります。
したがってブックマークの情報としては前回処理したタイムスタンプが保持されていると想像つきますが、念のためブックマークの情報をawscliで見てみました。
結論
こんな情報が入っていました。
{
"S3bucket_node1": {
"jsonClass": "HadoopDataSourceJobBookmarkState",
"timestamps": {
"RUN": "25",
"HIGH_BAND": "900000",
"CURR_LATEST_PARTITION": "1676437901000",
"CURR_LATEST_PARTITIONS": "s3://S3_BUCKET_NAME/glue-job-bookmark-test/date=20230201/",
"CURR_RUN_START_TIME": "2023-02-15T09:54:30.984Z",
"INCLUDE_LIST": "9b79...,..."
}
}
}
CURR_RUN_START_TIME
がタイムスタンプです。このタイムスタンプと新しいS3オブジェクトのタイムスタンプを比較して、処理すべきファイルかどうかを判定しています。
RUN
にはGlue Jobを何回実行したかの数値のようです。ブックマークをリセットしてもこの数値は0には戻りませんでした。
CURR_LATEST_PARTITION
にはGlue CrawlerがクロールしてData Catalogにパーティションの情報を書き込んだ日時が入っているように見えます。
それ以外の項目は不明です。
確認できた事実
入力元がS3で、出力先もS3で私が試した結果です。
- タイムスタンプが新しければ過去のパーティションであってもGlue Jobは処理する
- タイムスタンプの比較は
CURR_RUN_START_TIME
が使われている - Glue Crawlerで認識しているパーティション(Data Catalogに保存されているパーティション一覧)の中にあるオブジェクトのタイムスタンプがチェックされる
- Glue Crawlerでパーティションを認識させないとGlue Jobは新しいパーティションにあるファイルを処理しない
- 逆に、認識済みのパーティションに新規ファイルを追加した場合には、Glue Crawlerを動かさなくても、Glue Jobが新規ファイルを検知して処理する
試したこと
1. S3に /date=20230201/
というパーティションでCSVファイルを1つ置きます。
2. Crawlerを作成し、実行します。
3. Glue Jobを作成します。
Job未実行のこの時点ではブックマークは存在しません。
$ aws glue get-job-bookmark --job-name glue-job-bookmark-test
An error occurred (EntityNotFoundException) when calling the GetJobBookmark operation: Continuation for job glue-job-bookmark-test not found
4. Glue Jobを実行します。
出力先S3にオブジェクトが1つ生成されます。
ブックマークの情報をawscliで見てみます。
$ aws glue get-job-bookmark --job-name glue-job-bookmark-test
{
"JobBookmarkEntry": {
"JobName": "glue-job-bookmark-test",
"Version": 2,
"Run": 2,
"Attempt": 0,
"RunId": "jr_f09b...",
"JobBookmark": "{\"S3bucket_node1\":{\"jsonClass\":\"HadoopDataSourceJobBookmarkState\",\"timestamps\":{\"RUN\":\"1\",\"HIGH_BAND\":\"900000\",\"CURR_LATEST_PARTITION\":\"1676437901000\",\"CURR_LATEST_PARTITIONS\":\"s3://S3_BUCKET_NAME/glue-job-bookmark-test/date=20230201/\",\"CURR_RUN_START_TIME\":\"2023-02-15T05:27:10.140Z\",\"INCLUDE_LIST\":\"\"}}}"
}
}
JobBookmark
という項目にJSONが文字列で入っているようです。次のようにjqコマンドを2つつなぐとJSONを見やすくできます。
$ aws glue get-job-bookmark --job-name glue-job-bookmark-test | jq .JobBookmarkEntry.JobBookmark -r | jq
{
"S3bucket_node1": {
"jsonClass": "HadoopDataSourceJobBookmarkState",
"timestamps": {
"RUN": "1",
"HIGH_BAND": "900000",
"CURR_LATEST_PARTITION": "1676437901000",
"CURR_LATEST_PARTITIONS": "s3://S3_BUCKET_NAME/glue-job-bookmark-test/date=20230201/",
"CURR_RUN_START_TIME": "2023-02-15T05:27:10.140Z",
"INCLUDE_LIST": ""
}
}
}
5. 2つ目のCSVファイルをS3に /date=20230202/
という新しいパーティションで置きます。
6. Glue Jobを動かします。
結果、2つ目のCSVファイルは処理してくれませんでした。
Crawlerで新しいパーティションを認識しないと処理してくれないようです。
7. Glue Crawlerを動かします。
2つ目のパーティションが認識できました。
Glue Data Catalogに登録されたパーティションは以下のようなコマンドで確認できます。
$ aws glue get-partitions --database-name glue-job-bookmark-test-target --table-name glue_job_bookmark_test | jq '.Partitions[]|.Values'
[
"20230202"
]
[
"20230201"
]
8. Glue Jobを動かします。
Glue Jobは2つ目のCSVファイルを処理しました。
9. 3つ目のCSVファイルを1つ目のパーティション/date=20230201/
に置きます。
これは過去のパーティションに新しいファイルを設置しても認識できるかどうかの確認です。
10. Glue Jobを動かします。
Glue Jobは3つ目のCSVファイルを処理しました。
タイムスタンプが新しければ過去のパーティションであっても処理するようです。これは公式ドキュメントに書いてある通りの挙動です。
また、Data Catalogに登録済みのパーティション(Crawlerが認識済みのパーティション)に新規ファイルを追加した場合には、Glue Crawler実行不要でGlue Jobが処理できるようです。
まとめ
ブックマークに保存されている情報を再掲しますが、ここまでの実験から、CURR_RUN_START_TIME
を見て、これより新しいタイムスタンプのファイルを処理しているようです。CURR_LATEST_PARTITIONS
などは関係なさそうです。
$ aws glue get-job-bookmark --job-name glue-job-bookmark-test | jq .JobBookmarkEntry.JobBookmark -r | jq
{
"S3bucket_node1": {
"jsonClass": "HadoopDataSourceJobBookmarkState",
"timestamps": {
"RUN": "4",
"HIGH_BAND": "900000",
"CURR_LATEST_PARTITION": "1676439461000",
"CURR_LATEST_PARTITIONS": "s3://S3_BUCKET_NAME/glue-job-bookmark-test/date=20230202/",
"CURR_RUN_START_TIME": "2023-02-15T05:46:27.588Z",
"INCLUDE_LIST": "dcfe...,838e...,..."
}
}
}
追加の実験
前回実行結果のCURR_RUN_START_TIME
より新しいファイルを処理することはわかったのですが、その挙動の場合、新しく実行した結果保存されるCURR_RUN_START_TIME
の値よりも前のファイルを必ず処理するのは当然なのですが、逆にCURR_RUN_START_TIME
よりも新しいファイルは処理してはいけないはずです。その確認のための実験をしてみました。
CURR_RUN_START_TIME
という名前からしてGlue Jobの実行開始の時間が保存されていそうなので、Glue Jobが起動した直後でまた処理が始まってなさそうなタイミングでS3オブジェクトをアップロードしたらどうなるかです。このファイルを処理してしまったら、その次のGlue Job実行と重複処理してしまいます。
そこで、Glue Jobを起動して直後にS3にファイルをアップロードする実験を繰り返しました。
CURR_RUN_START_TIME
の値を観察していると、Glue Jobを起動操作してから30秒ほど経ってからの時間のようです。おそらくGlue Jobを実行するインスタンスを準備するための時間があるので、30秒ほどのラグがあると思われます。したがってGlue Jobを起動操作してから30秒ほど待ってからS3にアップロードし、実行完了後にS3オブジェクトのタイムスタンプとCURR_RUN_START_TIME
を比較して、偶然近くなったタイミングの処理内容を調べました。
その結果、たしかにCURR_RUN_START_TIME
より前のファイルは必ず処理され、後のファイルは処理されないように見えます。CURR_RUN_START_TIME
直後のファイルをGlue Jobは無視しているように見えます。
10回ぐらい繰り返して観察した結果にすぎないので、確かな情報はわからないのですが、いちおう公式ドキュメント通りの挙動のように思えました。