はじめに
前回の記事ではS3にファイルがアップロードされたことを契機にEventBridge経由でステートマシンを実行する方法を投稿ました。
しかしステートマシンを動かすために必要なファイルが複数ある場合、S3のファイルアップロード契機だけだと1ファイルごとにステートマシンが実行されてしまい、不必要な実行がされてしまったり、誤った結果が保存されてしまう危険があります。
そのため、今回の記事ではステートマシンが必要とするS3のファイルが複数の場合で、それら全てのファイルが処理実行日に作成されたものか(昨日作成されたものをステートマシンで処理しようとしていないかどうか)を確認した後でステートマシンを実行する実装例を記載しました。
※前回の記事
使うAWSサービス
・Amazon EventBridge
・Amazon S3
・AWS Lambda
・AWS StepFunctions
アーキテクチャ図
アーキテクチャ説明
1.S3バケットの通知設定
※前回記事参照
2.EventBridgeの設定
①EventBridgeルールを作成
※前回記事参照
※今回の記事では実行ターゲットをLambdaに変えている
3.LambdaでS3のファイルの更新日時チェックとステートマシン実行
①更新日時をチェックするコード例
・この例ではS3にアップロードされたファイルが今日更新されたものかを判定している
import boto3
from datetime import datetime, timezone, timedelta
def lambda_handler(event, context):
# S3バケットとフォルダの指定
bucket_name = '【バケット名】'
folder_path = '【更新日時を検証するフォルダパス】'
# 現在の日時を取得(JST)
jst = timezone('Asia/Tokyo')
current_datetime_jst = datetime.now(jst)
# 年と月の形式に変換した現在日時
current_ymd = current_datetime_jst.strftime('%Y%m%d')
# S3クライアントの作成
s3_client = boto3.client('s3')
# S3バケット内の指定したフォルダ内のオブジェクト一覧を取得
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_path)
# 最終更新日と現在日を比較
all_files_today = all(
obj['LastModified'].astimezone(jst).strftime('%Y%m%d') == current_ymd
for obj in response.get('Contents', [])
)
②ステートマシンを実行するコード例 ※上記コードの続き
・上記コードで判定された結果をもとにステートマシンを実行するか、そのまま終了とするかを分岐している
if all_files_today:
# 全てのファイルが今日更新されたものであればステートマシンを実行する
stepfunctions_client = boto3.client('stepfunctions')
state_machine_arn = '【ステートマシンのARN】'
response = stepfunctions_client.start_execution(
stateMachineArn=state_machine_arn,
input='{}'
)
else:
# 今日の更新年月に最終更新されたファイルが一部またはまったくない場合、何もせずに正常終了
# ログにS3ファイルがそろっていない旨のメッセージを出すなどしたほうが良い
return {
'statusCode': 200,
'body': 'Success'
}
さいごに
他システムからファイルが連携されてそれを使って日次バッチを実行するようなシステムの場合、EventBridgeの定時実行で確実にファイルが揃う時間に実行開始時間を設定することも良いと思います。ただし前回の記事でも書いたように連携元システムの何かしらの問題でその定時に連携が間に合わなかった場合に古いファイルで処理が実行されれてしまいます。
かといってS3ファイルのアップロードを契機に実行する設定をEventBridgeで行った場合はファイルのアップロードのたびに実行されたりなどの問題が発生します。
全てのファイルが確実に最新のものである状態でシステムを動かすため、Lambda等ですべてのファイルがそろっているかを判定したのちにステートマシンに処理を移すようなチェック機構は有効だと思います。
これによりステートマシンが条件がそろったときのみ動くことが保証され余計なコストの発生も防ぐことができると期待できます